## Dependencies

## Pre-processing

### Data Leakage Removal

In [None]:
#THIS CODE UTILIZES GOOGLE CLOUD, to run this code you need to connect to GOOGLE APIs for using the sheets used in this code

import os
import numpy as np
import pandas as pd
import librosa
import matplotlib.pyplot as plt
from scipy import signal
from scipy.signal import find_peaks
from moviepy.editor import VideoFileClip

# Function to find timestamps with high correlation to a certain sound
def find_offset(episode, jingle, window):
    y_within, sr_within = librosa.load(episode, sr=None)
    y_find, _ = librosa.load(jingle, sr=sr_within)

    # Apply cross-correlation model
    c = signal.correlate(y_within, y_find[:sr_within * window], mode='valid', method='fft')

    # Find the peaks (timestamps where the correlation is the highest)
    peaks, _ = find_peaks(c, prominence=0.1, distance=20)
    
    # Select top 5 peaks with the highest correlation
    top_peaks = np.argsort(c[peaks])[-5:]
    peak_indices_sorted = np.sort(peaks[top_peaks])
    
    # Convert peaks to seconds
    offset = [round(peak / sr_within, 2) for peak in peak_indices_sorted]

    # Remove peaks that are within the same 30 seconds window
    filtered_offset = [offset[0]]
    for i in range(1, len(offset)):
        if offset[i] - offset[i - 1] > 30:
            filtered_offset.append(offset[i])

    return filtered_offset

# Function to find timestamps with high correlation to a certain sound after a given offset
def find_offset_end(episode, jingle, window, offset, duration):
    y_within, sr_within = librosa.load(episode, sr=None, offset=offset, duration=duration)
    y_find, _ = librosa.load(jingle, sr=sr_within)

    # Apply cross-correlation model
    c = signal.correlate(y_within, y_find[:sr_within * window], mode='valid', method='fft')

    # Find the peaks (timestamps where the correlation is the highest)
    peaks, _ = find_peaks(c, prominence=0.1, distance=20)
    
    # Select top 5 peaks with the highest correlation
    top_peaks = np.argsort(c[peaks])[-5:]
    peak_indices_sorted = np.sort(peaks[top_peaks])
    
    # Convert peaks to seconds
    offset = [round(peak / sr_within, 2) for peak in peak_indices_sorted]

    return offset

# Load the Google Sheets where we will store split times
worksheet = gc.open('splitTimes').sheet1
worksheet2 = gc.open('PitchCount').sheet1

# Retrieve expected number of pitches for each episode from the sheet
rows = worksheet2.get_all_values()
df = pd.DataFrame.from_records(rows, columns=['Episode', 'PitchCount'])

# Loop over each episode in the raw video folder
raw_videos_path = 'INSERT YOUR GOOGLE PATH HERE'
for filename in os.listdir(raw_videos_path):
    file_path = os.path.join(raw_videos_path, filename)

    # Call the offset function to get a list of timestamps where the starting sound occurs
    offsets = find_offset(file_path, "INSERT YOUR GOOGLE PATH HERE", 10)

    # Check if the number of pitches matches the expected count
    row = df[df['Episode'] == filename[:-4]]
    flag = 0
    if not row.empty:
        value = row.iloc[0]['PitchCount']
        if int(value) != len(offsets):
            flag = 1
    else:
        flag = 2

    for index, value in enumerate(offsets):
        # Find the first empty row in column A
        row = 1
        while worksheet.cell(row, 1).value != '':
            row += 1

        # Fill in timestamps and flag
        worksheet.update_cell(row, 1, filename)
        worksheet.update_cell(row, 2, value)
        worksheet.update_cell(row, 4, flag)
        worksheet.update_cell(row, 5, f"{filename[:-4]}P{index}")

        try:
            worksheet.update_cell(row, 3, offsets[index + 1])
        except IndexError:
            worksheet.update_cell(row, 3, "Null")

# Update timestamp sheet with correct ending times
rows = worksheet.get_all_values()
df = pd.DataFrame.from_records(rows, columns=['Episode', 'StartTime', 'EndTime', 'Flag', 'Pitch'])

for filename in os.listdir(raw_videos_path):
    file_path = os.path.join(raw_videos_path, filename)
    video = VideoFileClip(file_path)

    # Select the rows in the sheet relevant for this episode
    filtered_df = df[df['Episode'] == filename].reset_index(drop=True)

    # Loop over the timesplits
    for index, row in filtered_df.iterrows():
        start_time = row['StartTime']
        start = float(start_time) + 15

        # Get offset for the ending sound
        offsets = find_offset_end(file_path, "INSERT YOUR GOOGLE PATH HERE", 10, start, 150)
        subclip = video.subclip(start - 4, start + offsets[0])

        # Define filepath where we output new file
        output_filename_mp4 = f"{filename[:-4]}P{index}.mp4"
        output_path_mp4 = f"INSERT YOUR GOOGLE PATH HERE/{output_filename_mp4}"

        # Save subclip
        subclip.write_videofile(output_path_mp4)


In [None]:
import os
import cv2
import numpy as np
import face_recognition
import csv
from tqdm import tqdm

# Define general paths
input_videos_path = "path/to/input_videos"
output_videos_path = "path/to/output_videos"
csv_file_path = "path/to/output_csv/processed_videos_summary.csv"
shark_images_folder = "path/to/shark_images"

# Ensure output directories exist
os.makedirs(output_videos_path, exist_ok=True)
os.makedirs(os.path.dirname(csv_file_path), exist_ok=True)

# CSV header updated to include noise ratio and count
csv_header = ['ID', 'Shark_ratio', 'Entrepreneur_ratio', 'Noise_ratio', 'Frames', 'Sharks_total', 'Entrepreneurs_total', 'Noise_total', 'Input_video_length', 'Output_video_length']

# Function to load shark face encodings from images
def load_shark_encodings(shark_images_folder):
    shark_encodings = []
    for image_name in os.listdir(shark_images_folder):
        image_path = os.path.join(shark_images_folder, image_name)
        image = face_recognition.load_image_file(image_path)
        encodings = face_recognition.face_encodings(image)
        if encodings:
            shark_encodings.append(encodings[0])
    return shark_encodings

# Load shark encodings
shark_encodings = load_shark_encodings(shark_images_folder)
print(f"Loaded shark encodings: {len(shark_encodings)}")

frame_skip = 2
output_fps_option = 15  # Option to switch between 30fps and 15fps for output

# Process each video file in the input directory
video_files = [f for f in os.listdir(input_videos_path) if f.endswith('.mp4')]
for idx, video_file in enumerate(video_files):
    video_path = os.path.join(input_videos_path, video_file)
    cap = cv2.VideoCapture(video_path)
    
    print(f"\nProcessing video {idx+1}/{len(video_files)}: {video_file}")
    
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    input_video_length = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    output_fps = fps if output_fps_option == 30 else fps / 2

    entrepreneur_video_path = os.path.join(output_videos_path, f'{video_file[:-4]}_entrepreneurs.mp4')
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(entrepreneur_video_path, fourcc, output_fps, (frame_width, frame_height))

    total_frames_processed = 0
    shark_frames_count = 0
    entrepreneur_frames_count = 0
    noise_frames_count = 0  # Counter for noise frames

    # Adding tqdm for progress bar
    for frame_id in tqdm(range(int(input_video_length)), desc=f"Processing video {idx+1}/{len(video_files)}"):
        ret, frame = cap.read()
        if not ret:
            break

        if frame_id % frame_skip == 0:
            total_frames_processed += 1
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_encodings = face_recognition.face_encodings(frame_rgb)

            frame_contains_shark = False

            for encoding in frame_encodings:
                match_results = face_recognition.compare_faces(shark_encodings, encoding, tolerance=0.4)
                if True in match_results:
                    frame_contains_shark = True
                    shark_frames_count += 1
                    break  # Shark detected, skip further processing for this frame

            # Write frame to output video if it does not contain a shark
            if not frame_contains_shark:
                out.write(frame)
                if frame_encodings:
                    entrepreneur_frames_count += 1
                else:
                    noise_frames_count += 1  # Count as noise if no faces are detected

        frame_id += 1
    
    cap.release()
    out.release()

    # Calculating ratios
    shark_ratio = shark_frames_count / total_frames_processed if total_frames_processed else 0
    entrepreneur_ratio = entrepreneur_frames_count / total_frames_processed if total_frames_processed else 0
    noise_ratio = noise_frames_count / total_frames_processed if total_frames_processed else 0
    input_video_length_seconds = input_video_length / fps
    output_video_length_seconds = (entrepreneur_frames_count + noise_frames_count) / output_fps

    # Opening the CSV file in append mode and writing data including noise information
    with open(csv_file_path, 'a', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        if os.stat(csv_file_path).st_size == 0:  # Check if the file is empty and write the header
            csvwriter.writerow(csv_header)
        csvwriter.writerow([video_file.split('.')[0], shark_ratio, entrepreneur_ratio, noise_ratio, total_frames_processed, shark_frames_count, entrepreneur_frames_count, noise_frames_count, input_video_length_seconds, output_video_length_seconds])
        
    print(f"\nFinished processing {video_file}. Total frames processed: {total_frames_processed}. Shark frames: {shark_frames_count}, Entrepreneur frames: {entrepreneur_frames_count}, Noise frames: {noise_frames_count}")
        
print("\nPipeline complete.")


## Face Analysis Pipeline

### Feature extraction

In [None]:
import os
import cv2
import numpy as np
from pathlib import Path
from facenet_pytorch import MTCNN
from deepface import DeepFace
import csv
import fnmatch

def align_face(image, left_eye_pos, right_eye_pos):
    delta_x = right_eye_pos[0] - left_eye_pos[0]
    delta_y = right_eye_pos[1] - left_eye_pos[1]
    angle = np.arctan(delta_y / delta_x) * 180 / np.pi
    eyes_center = ((left_eye_pos[0] + right_eye_pos[0]) // 2, (left_eye_pos[1] + right_eye_pos[1]) // 2)
    rotation_matrix = cv2.getRotationMatrix2D(eyes_center, angle, scale=1)
    aligned_image = cv2.warpAffine(image, rotation_matrix, (image.shape[1], image.shape[0]))
    return aligned_image, rotation_matrix

def round_dict_values(data, decimal_places=8):
    return {k: round(v, decimal_places) if isinstance(v, float) else v for k, v in data.items()}

def detect_and_analyze_faces(video_path: str, emotion_output_folder: str, race_output_folder: str, no_face_tracker_path: str):
    detector = MTCNN(keep_all=True, device='cpu')
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Failed to open video: {video_path}")
        return
    video_stem = Path(video_path).stem
    video_info = video_stem.split('_')[0]

    emotions_data = []
    races_data = []
    no_face_frames = 0

    frame_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        boxes, _, landmarks = detector.detect(frame_rgb, landmarks=True)

        if boxes is not None and len(boxes) > 0:
            for person_number, (box, landmark) in enumerate(zip(boxes, landmarks), start=1):
                if landmark is not None:
                    left_eye = landmark[0]
                    right_eye = landmark[1]

                    aligned_frame, rotation_matrix = align_face(frame, left_eye, right_eye)
                    polygon = np.array([[box[0], box[1]], [box[2], box[1]], [box[2], box[3]], [box[0], box[3]]])
                    transformed_polygon = cv2.transform(np.array([polygon]), rotation_matrix)[0]
                    x1, y1 = np.min(transformed_polygon, axis=0)
                    x2, y2 = np.max(transformed_polygon, axis=0)
                    x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
                    crop_img = aligned_frame[y1:y2, x1:x2]

                    if crop_img.size == 0:
                        continue

                    try:
                        results = DeepFace.analyze(crop_img, actions=['age', 'emotion', 'race'], enforce_detection=False)
                        result = results[0] if isinstance(results, list) and len(results) > 0 else results

                        emotions = round_dict_values(result['emotion'])
                        races = round_dict_values(result['race'])
                        age = result['age']  # Capture the age data

                        frame_id = f"{video_info}F{frame_count}P{person_number}"

                        emotions_data.append({'ID': frame_id, **emotions})
                        races_data.append({'ID': frame_id, 'age': age, **races})
                    except Exception as e:
                        print(f"An error occurred when processing frame {frame_count} for person {person_number}: {e}")
        else:
            no_face_frames += 1

        frame_count += 1

    save_data_to_csv(emotions_data, os.path.join(emotion_output_folder, f"{video_info}.csv"))
    save_data_to_csv(races_data, os.path.join(race_output_folder, f"{video_info}.csv"))
    save_no_face_data(video_info, no_face_frames, frame_count, no_face_tracker_path)

def save_data_to_csv(data_list, file_path):
    if not data_list:
        return

    fieldnames = ['ID'] + sorted({key for d in data_list for key in d if key != 'ID'})
    with open(file_path, 'w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=fieldnames)
        writer.writeheader()
        for data in data_list:
            writer.writerow(data)
    print(f"Data saved to {file_path}.")

def save_no_face_data(video_info, no_face_frames, frame_count, no_face_tracker_path):
    if not os.path.exists(no_face_tracker_path):
        with open(no_face_tracker_path, 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['ID', 'NoFaceFrames', 'Total Input Frames', 'Total Output Frames', 'Total Length'])

    with open(no_face_tracker_path, 'a', newline='') as file:
        writer = csv.writer(file)
        output_frames = frame_count - no_face_frames
        output_length = output_frames / 15
        writer.writerow([video_info, no_face_frames, frame_count, output_frames, output_length])
    print(f"No face data updated for {video_info}.")

def is_video_processed(video_id, no_face_tracker_path):
    try:
        with open(no_face_tracker_path, mode='r') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                if row['ID'] == video_id:
                    return True
    except FileNotFoundError:
        return False
    return False

def process_videos_and_save_data(input_folder: str, emotion_output_folder: str, race_output_folder: str, no_face_tracker_path: str):
    os.makedirs(emotion_output_folder, exist_ok=True)
    os.makedirs(race_output_folder, exist_ok=True)
    for video_file in Path(input_folder).iterdir():
        if fnmatch.fnmatch(video_file.name, '*.mp4') or fnmatch.fnmatch(video_file.name, '*.MP4') or fnmatch.fnmatch(video_file.name, '*.mpeg4'):
            video_id = video_file.stem.split('_')[0]
            if is_video_processed(video_id, no_face_tracker_path):
                continue
            detect_and_analyze_faces(str(video_file), emotion_output_folder, race_output_folder, no_face_tracker_path)
        else:
            print(f"Skipping {video_file.name} due to incompatible file extension.")

if __name__ == "__main__":
    video_input_folder = "path/to/input_videos"
    emotion_output_folder = "path/to/emotion_output"
    race_output_folder = "path/to/race_output"
    no_face_tracker_path = "path/to/no_face_tracker.csv"

    process_videos_and_save_data(video_input_folder, emotion_output_folder, race_output_folder, no_face_tracker_path)

    print("All videos processed.")


### Averaging to get the data

Emotion

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path

def calculate_averages(df_sorted):
    # Calculate overall and segment averages, excluding the 'ID' and 'FrameNumber' columns
    overall_avg = df_sorted.iloc[:, 1:-1].mean().tolist()
    num_rows = len(df_sorted) // 3
    start_avg = df_sorted.iloc[:num_rows, 1:-1].mean().tolist()
    middle_avg = df_sorted.iloc[num_rows:2*num_rows, 1:-1].mean().tolist()
    end_avg = df_sorted.iloc[2*num_rows:, 1:-1].mean().tolist()
    return overall_avg, start_avg, middle_avg, end_avg

def process_files(source_directory, target_directory):
    source_path = Path(source_directory)
    target_path = Path(target_directory)

    target_path.mkdir(parents=True, exist_ok=True)
    results = []

    for file_path in source_path.glob('*.csv'):
        df = pd.read_csv(file_path)
        df['FrameNumber'] = df['ID'].str.extract('F(\d+)').astype(int)
        df_sorted = df.sort_values(by='FrameNumber', ascending=True)

        # Calculate averages
        overall_avg, start_avg, middle_avg, end_avg = calculate_averages(df_sorted)

        # Construct result row
        result_row = [file_path.stem]
        for i in range(len(overall_avg)):  # Assume all lists are the same length
            result_row.extend([overall_avg[i], start_avg[i], middle_avg[i], end_avg[i]])
        results.append(result_row)

    if results:
        # Create column names for the output file
        columns = ['ID']
        original_columns = df_sorted.columns[1:-1]  # Exclude 'ID' and 'FrameNumber' columns for naming
        segments = ['Overall', 'Start', 'Middle', 'End']
        for col in original_columns:
            for segment in segments:
                columns.append(f"{col}_{segment}")

        results_df = pd.DataFrame(results, columns=columns)
        results_df.to_csv(target_path / 'averages_segmented.csv', index=False)
        print(f"Averages file created at: {target_path / 'averages_segmented.csv'}")
    else:
        print("No CSV files found in the source directory.")

# Example usage
source_directory = "path/to/source_directory"
target_directory = "path/to/target_directory"
process_files(source_directory, target_directory)


Age

In [None]:
import pandas as pd
from pathlib import Path

def calculate_average_of_column(df):
    # Compute the average of the second column
    return df.iloc[:, 1].mean()

def process_files(source_directory, target_directory):
    source_path = Path(source_directory)
    target_path = Path(target_directory)

    target_path.mkdir(parents=True, exist_ok=True)
    results = []

    # Iterate over each CSV file in the source directory
    for file_path in source_path.glob('*.csv'):
        df = pd.read_csv(file_path)

        # Calculate the average of the second column
        column_avg = calculate_average_of_column(df)

        # Append the filename (or ID) and the calculated average to the results list
        results.append([file_path.stem, column_avg])

    if results:
        # Create the DataFrame with column names
        results_df = pd.DataFrame(results, columns=['ID', 'Age_average'])

        # Save the results to a CSV file
        results_df.to_csv(target_path / 'column_averages.csv', index=False)
        print(f"Averages file created at: {target_path / 'column_averages.csv'}")
    else:
        print("No CSV files found in the source directory.")

# Example usage
source_directory = "path/to/source_directory"
target_directory = "path/to/target_directory"
process_files(source_directory, target_directory)


Race

In [None]:
import pandas as pd
from pathlib import Path

def one_hot_encode_race(df):
    # One-hot encode the race columns by setting the dominant race to 1 and others to 0
    race_columns = ['asian', 'black', 'indian', 'latino hispanic', 'middle eastern', 'white']
    max_values = df[race_columns].max(axis=1)
    for col in race_columns:
        df[col] = (df[col] == max_values).astype(int)
    return df

def calculate_ratios(df, race_columns):
    # Calculate the ratio of 1s in each race column
    ratios = df[race_columns].sum() / len(df)
    return ratios.tolist()

def process_files(source_directory, target_directory):
    source_path = Path(source_directory)
    target_path = Path(target_directory)

    target_path.mkdir(parents=True, exist_ok=True)
    results = []

    for file_path in source_path.glob('*.csv'):
        df = pd.read_csv(file_path)
        df.drop('age', axis=1, inplace=True)
        df = one_hot_encode_race(df)

        # Calculate ratios for each race
        race_columns = ['asian', 'black', 'indian', 'latino hispanic', 'middle eastern', 'white']
        ratios = calculate_ratios(df, race_columns)

        # Append the filename (or ID) and the calculated ratios to the results list
        results.append([file_path.stem] + ratios)

    if results:
        # Create the DataFrame with column names
        columns = ['ID'] + race_columns
        results_df = pd.DataFrame(results, columns=columns)

        # Save the results to a CSV file
        results_df.to_csv(target_path / 'race_ratios.csv', index=False)
        print(f"Ratios file created at: {target_path / 'race_ratios.csv'}")
    else:
        print("No CSV files found in the source directory.")

# Example usage
source_directory = "path/to/source_directory"
target_directory = "path/to/target_directory"
process_files(source_directory, target_directory)


### Number of faces extraction

In [None]:
import os
import pandas as pd
from pathlib import Path

def count_frequent_people(df, threshold):
    # Extract person identifiers from 'ID' column
    person_ids = df['ID'].str.extract(r'P(\d+)$')[0]
    
    # Count the frequency of each unique person identifier
    person_count = person_ids.value_counts()
    
    # Filter person identifiers that appear at least as frequently as the threshold
    frequent_persons = person_count[person_count >= threshold].index.tolist()
    
    # Return the count of unique frequent person identifiers
    return len(frequent_persons)

def process_files(input_directory_path, output_file_path):
    input_path = Path(input_directory_path)
    people_count_data = []

    # Loop over all CSV files in the directory
    for filename in input_path.glob('*.csv'):
        # Read the CSV file into a DataFrame
        df = pd.read_csv(filename)

        # Calculate the threshold for a person to be included (15% of the number of rows)
        threshold = len(df) * 0.15

        # Count the number of unique people that meet the threshold
        number_of_people = count_frequent_people(df, threshold)

        # Add the filename and the number of people to the list
        people_count_data.append([filename.stem, number_of_people])

    # Convert the list to a DataFrame
    output_df = pd.DataFrame(people_count_data, columns=['ID', 'Number of people'])

    # Write the DataFrame to a new CSV file
    output_df.to_csv(output_file_path, index=False)
    print(f"Output file created at: {output_file_path}")

# Example usage
input_directory_path = "path/to/input_directory"
output_file_path = "path/to/output_file.csv"
process_files(input_directory_path, output_file_path)


## Verbal Analysis Pipeline

### Transcript extraction

In [None]:
from pydub import AudioSegment
import os
import csv
from pathlib import Path

def process_audio_files(folder_path, target_length_ms, csv_file_path):
    audio_data = []

    # Loop over each file in the folder
    for filename in Path(folder_path).glob('*'):
        if filename.suffix in ['.mp3', '.wav']:
            full_path = filename

            # Load the audio file
            audio = AudioSegment.from_file(full_path)

            # Add filename and its duration (in seconds) to the list
            audio_data.append([filename.name, len(audio) / 1000.0])

            # Check if the audio file is longer than the target length
            if len(audio) > target_length_ms:
                print(f"Found a file exceeding target length: {filename.name}")

                # Trim the audio to the target length
                trimmed_audio = audio[:target_length_ms]

                # Replace the original file with the trimmed file
                trimmed_audio.export(full_path, format=filename.suffix[1:])

    print("Processing complete.")

    # Write the audio data to a CSV file
    with open(csv_file_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Filename', 'Duration (seconds)'])
        writer.writerows(audio_data)

    print(f"CSV export complete. File saved at: {csv_file_path}")

# Example usage
folder_path = "path/to/audio_files"
target_length_ms = 2 * 60 * 1000 + 15 * 1000  # 2 minutes and 15 seconds
csv_file_path = "path/to/output_file.csv"
process_audio_files(folder_path, target_length_ms, csv_file_path)


In [None]:
import os
import csv
from openai import OpenAI

# Initialize the OpenAI client with your API key
client = OpenAI(api_key="your_api_key_here")

def transcribe_audio_files(input_folder, output_folder):
    # Ensure the output folder exists
    os.makedirs(output_folder, exist_ok=True)

    # Loop over each audio file in the input folder
    for filename in os.listdir(input_folder):
        if filename.endswith('.mp3') or filename.endswith('.wav'):
            print(f"Processing file: {filename}")

            # Get the full file path
            file_path = os.path.join(input_folder, filename)

            # Open the audio file
            with open(file_path, "rb") as audio_file:
                # Create the transcription using OpenAI's Whisper model
                transcription = client.audio.transcriptions.create(
                    model="whisper-1",
                    file=audio_file
                )

            print(transcription.text)

            # Define the output CSV file path
            output_path = os.path.join(output_folder, f"{filename[:-4]}.csv")

            # Write the transcription to a CSV file
            with open(output_path, 'w', newline='') as file:
                writer = csv.writer(file)
                writer.writerow(["Filename", "Transcript"])
                writer.writerow([filename[:-4], transcription.text])

    print("Processing and transcription complete.")

# Example usage
input_folder = "path/to/input_folder"
output_folder = "path/to/output_folder"
transcribe_audio_files(input_folder, output_folder)


### Dictionaries

Psychological Dictionary

In [None]:
import os
import pandas as pd
import string
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize

# Make sure to have these downloaded before running the script
# nltk.download('punkt')
# nltk.download('stopwords')
# nltk.download('wordnet')

def preprocess_text(text):
    """Preprocess text by converting to lowercase, removing punctuation, 
    tokenizing, removing stopwords, and lemmatizing."""
    text = text.lower()
    text = text.translate(str.maketrans('', '', string.punctuation))
    text = ''.join(char for char in text if char.isalnum() or char.isspace())
    words = word_tokenize(text)
    stop_words = set(stopwords.words('english'))
    filtered_words = [word for word in words if word not in stop_words]
    lemmatizer = WordNetLemmatizer()
    lemmatized_words = [lemmatizer.lemmatize(word) for word in filtered_words]
    return lemmatized_words, len(lemmatized_words)

def analyze_transcripts(folder_path, dict_path, output_path):
    # Load the dictionary CSV and preprocess the relevant column
    dict_df = pd.read_csv(dict_path)
    dict_df['preprocessed word 3 (lemmatized)'] = dict_df['preprocessed word 3 (lemmatized)'].str.lower()

    categories = ['Sociability', 'Morality', 'Ability', 'Agency']
    dataframes_list = []

    # Loop through all files in the specified directory
    for filename in os.listdir(folder_path):
        if filename.endswith(".csv"):
            file_path = os.path.join(folder_path, filename)
            text_df = pd.read_csv(file_path)
            text_to_check = str(text_df.iloc[0, 1])
            preprocessed_words, total_words = preprocess_text(text_to_check)

            # Initialize counters
            counters = {f'Positive {c}': 0 for c in categories}
            counters.update({f'Negative {c}': 0 for c in categories})

            # Count matches and directions
            for word in preprocessed_words:
                matched_rows = dict_df[dict_df['preprocessed word 3 (lemmatized)'] == word]
                for _, row in matched_rows.iterrows():
                    for category in categories:
                        if row[category + ' dictionary'] == 1:
                            direction = row[category + ' direction']
                            if direction == 1:
                                counters[f'Positive {category}'] += 1
                            elif direction == -1:
                                counters[f'Negative {category}'] += 1

            # Calculate ratios
            ratios = {k: v / total_words if total_words > 0 else 0 for k, v in counters.items()}
            ratios['ID'] = os.path.splitext(filename)[0]
            dataframes_list.append(pd.DataFrame([ratios]))

    results_df = pd.concat(dataframes_list, ignore_index=True)
    results_df.to_csv(output_path, index=False)
    print(f"Results saved to: {output_path}")

# Example usage
folder_path = "path/to/transcripts"
dict_path = "path/to/dictionary.csv"
output_path = "path/to/results_ratios.csv"
analyze_transcripts(folder_path, dict_path, output_path)


Mcdonald and Hedonometer

In [None]:
import os
import pandas as pd
import string
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize

# nltk.download('punkt')
# nltk.download('stopwords')
# nltk.download('wordnet')

def load_words(file_path):
    """Load words from a file into a set."""
    with open(file_path, 'r') as file:
        return set(file.read().lower().splitlines())

def preprocess_text(text):
    """Preprocess text by converting to lowercase, removing punctuation, 
    tokenizing, removing stopwords, and lemmatizing."""
    text = text.lower()
    text = text.translate(str.maketrans('', '', string.punctuation))
    text = ''.join(char for char in text if char.isalnum() or char.isspace())
    words = word_tokenize(text)
    stop_words = set(stopwords.words('english'))
    lemmatizer = WordNetLemmatizer()
    words = [lemmatizer.lemmatize(word) for word in words if word not in stop_words]
    return words

def analyze_transcripts(category_files_path, transcripts_folder, happiness_dict_path, output_csv_path):
    # Load words for each category into a set for faster searching
    categories = ['uncertainty', 'positive_finance', 'negative_finance', 'certainty']
    category_words = {category: load_words(os.path.join(category_files_path, f'{category}.txt')) for category in categories}

    # Load the happiness words and their scores
    happiness_dict = pd.read_csv(happiness_dict_path).set_index('word')['happiness_score'].to_dict()

    results_list = []

    # Loop through transcript files and process them
    for filename in os.listdir(transcripts_folder):
        if filename.endswith(".csv"):
            file_path = os.path.join(transcripts_folder, filename)
            with open(file_path, 'r') as file:
                text = file.read()
                preprocessed_words = preprocess_text(text)

                # Count occurrences and categorize happiness scores
                counters = {category: 0 for category in categories}
                happiness_positive_count = 0
                happiness_negative_count = 0

                for word in preprocessed_words:
                    for category, words_set in category_words.items():
                        if word in words_set:
                            counters[category] += 1
                    if word in happiness_dict:
                        if happiness_dict[word] > 5:
                            happiness_positive_count += 1
                        elif happiness_dict[word] < 5:
                            happiness_negative_count += 1

                # Calculate ratios
                total_words = len(preprocessed_words)
                ratios = {k: v / total_words if total_words > 0 else 0 for k, v in counters.items()}
                ratios['Happiness_Positive'] = happiness_positive_count / total_words if total_words > 0 else 0
                ratios['Happiness_Negative'] = happiness_negative_count / total_words if total_words > 0 else 0
                ratios['ID'] = os.path.splitext(filename)[0]

                results_list.append(ratios)

    # Convert the list of results to a DataFrame and save to CSV
    results_df = pd.DataFrame(results_list)
    results_df.to_csv(output_csv_path, index=False)
    print(f'Results saved to {output_csv_path}')

# Example usage
category_files_path = "path/to/category_files"
transcripts_folder = "path/to/transcripts"
happiness_dict_path = "path/to/happiness_dictionary.csv"
output_csv_path = "path/to/results_ratios.csv"
analyze_transcripts(category_files_path, transcripts_folder, happiness_dict_path, output_csv_path)


### WPM, Total Words, Total Sentences

In [None]:
import os
import pandas as pd
import re

def calculate_words_per_minute(input_folder, durations_file, output_file):
    # Read the durations file
    durations_df = pd.read_csv(durations_file, delimiter=';')

    # Adjust the filenames in the durations dictionary to match the filenames from transcripts
    durations_dict = pd.Series(durations_df.iloc[:, 1].values, index=durations_df.iloc[:, 0].str.replace('.wav', '')).to_dict()

    results = []

    # Loop through all CSV files in the input folder
    for filename in os.listdir(input_folder):
        if filename.endswith(".csv"):
            base_filename = os.path.splitext(filename)[0]
            csv_file_path = os.path.join(input_folder, filename)

            # Read the CSV file
            data = pd.read_csv(csv_file_path)

            # Check if the data has the expected 'Transcript' column
            if 'Transcript' in data.columns:
                words_string = str(data['Transcript'].iloc[0])
                word_count = len(words_string.split())
                sentence_count = len(re.split(r'[.?!]+', words_string)) - 1

                # Retrieve the duration from the dictionary using the base filename
                duration_in_seconds = durations_dict.get(base_filename)

                # Ensure the duration is valid
                if duration_in_seconds and duration_in_seconds > 0:
                    total_time_in_minutes = duration_in_seconds / 60
                    words_per_minute = word_count / total_time_in_minutes
                    results.append([base_filename, words_per_minute, word_count, sentence_count])
                else:
                    print(f"Warning: Duration for {base_filename} is zero or missing. Skipping.")
            else:
                print(f"Warning: 'Transcript' column not found in {filename}.")

    # Create a DataFrame for the results
    results_df = pd.DataFrame(results, columns=["Filename", "Words per Minute", "Word Count", "Sentence Count"])

    # Write the results to the output CSV file
    results_df.to_csv(output_file, index=False)
    print(f"Results written to {output_file}")

# Example usage
input_folder = "path/to/transcripts"
durations_file = "path/to/durations.csv"
output_file = "path/to/results.csv"
calculate_words_per_minute(input_folder, durations_file, output_file)


## Body Analysis Pipeline

In [None]:
#Make sure following dependencies are installed
#!pip install -q mediapipe
#!wget -q https://storage.googleapis.com/mediapipe-models/gesture_recognizer/gesture_recognizer/float16/1/gesture_recognizer.task
#!wget -q -O efficientdet.tflite -q https://storage.googleapis.com/mediapipe-models/object_detector/efficientdet_lite0/int8/1/efficientdet_lite0.tflite
#!wget -q https://storage.googleapis.com/mediapipe-models/hand_landmarker/hand_landmarker/float16/1/hand_landmarker.task
#!wget -O face_landmarker_v2_with_blendshapes.task -q https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task
#!pip install ultralytics


In [None]:
import os
import csv
import cv2
import numpy as np

# Function to check if the right hand is higher than the left hand
def cross_hand_check(left_hand, right_hand):
    return right_hand > left_hand

# Function to calculate the bounding box for a face given its landmarks
def calculate_face_bbox(face_landmarks, rgb_image):
    height, width, _ = rgb_image.shape

    x_coordinates = [landmark.x for landmark in face_landmarks]
    y_coordinates = [landmark.y for landmark in face_landmarks]

    bbox_top_left_x = int(min(x_coordinates) * width)
    bbox_top_left_y = int(min(y_coordinates) * height)
    bbox_bottom_right_x = int(max(x_coordinates) * width)
    bbox_bottom_right_y = int(max(y_coordinates) * height)

    bbox_height = bbox_bottom_right_y - bbox_top_left_y
    bbox_width = bbox_bottom_right_x - bbox_top_left_x

    bbox_top_left_y = max(0, bbox_top_left_y - int(bbox_height * 1.5))
    bbox_top_left_x = max(0, bbox_top_left_x - int(bbox_width * 0.25))
    bbox_bottom_right_x = min(width, bbox_bottom_right_x + int(bbox_width * 0.25))
    bbox_bottom_right_y = min(height, bbox_bottom_right_y + int(bbox_height * 0.5))

    return [(bbox_top_left_x, bbox_top_left_y), (bbox_bottom_right_x, bbox_bottom_right_y)]

# Function to check if given points are within a bounding box
def is_within_bounding_box(points, top_left, bottom_right):
    for x, y in points:
        top_left_x, top_left_y = top_left
        bottom_right_x, bottom_right_y = bottom_right

        if top_left_x <= x <= bottom_right_x and bottom_right_y <= y <= top_left_y:
            return True
    return False

# Directory to save cropped images
cropped_images_dir = '/content/drive/My Drive/Master thesis/Code/BodyDataCNN/OUTPUT'



# Headers for CSV
headers = [
    "id", "left_wrist_X", "left_wrist_Y", "right_wrist_X", "right_wrist_Y",
    "crossing_arms", "hand_open", "touching_face", "hand_width",
    "left_hand_height", "right_hand_height", "open_pose"
]

def create_csv(filename, data_list):
    """
    Creates a CSV file with the specified filename and data.

    Args:
        filename: The name of the CSV file.
        data_list: The list of data to be written to the CSV file.
    """
    csv_file_path = f'/content/drive/My Drive/Master thesis/Code/BodyDataCNN/{filename}.csv'

    with open(csv_file_path, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=headers)
        writer.writeheader()
        for data in data_list:
            writer.writerow(data)

    print(f"CSV file '{csv_file_path}' created with headers and data.")


In [None]:
import os
import cv2
import math as m
import numpy as np
import mediapipe as mp  # Import MediaPipe
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

# Define the directory to save cropped images
cropped_images_dir = '/content/drive/My Drive/Master thesis/Code/BodyDataCNN/OUTPUT'

# Initialize MediaPipe drawing and pose utilities
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=True, model_complexity=1, min_detection_confidence=0.5)

# Initialize Gesture Recognizer
base_options = python.BaseOptions(model_asset_path='gesture_recognizer.task')
options = vision.GestureRecognizerOptions(base_options=base_options)
recognizer = vision.GestureRecognizer.create_from_options(options)

# Calculate the angle between two points
def find_angle(x1, y1, x2, y2):
    theta = m.acos((y2 - y1) * (-y1) / (m.sqrt((x2 - x1)**2 + (y2 - y1)**2) * y1))
    degree = int(180 / m.pi) * theta
    return degree

# Calculate the distance between two points
def find_distance(x1, y1, x2, y2):
    dist = m.sqrt((x2 - x1)**2 + (y2 - y1)**2)
    return dist

# Map body pose landmarks from an image and save to CSV
def map_body_pose(img, filename):
    img.flags.writeable = False

    # Get joints from pose
    results = pose.process(img)

    # Get gesture recognition result
    mp_image_2 = mp.Image(image_format=mp.ImageFormat.SRGB, data=img)
    recognition_result = recognizer.recognize(mp_image_2)

    img.flags.writeable = True
    image_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)

    if results.pose_landmarks:
        face_box = calculate_face_bbox(results.pose_landmarks.landmark[:11], img)

        # Check if one of the hands is touching the face
        touching_face = is_within_bounding_box(
            [(results.pose_landmarks.landmark[19].x, results.pose_landmarks.landmark[19].y),
             (results.pose_landmarks.landmark[20].x, results.pose_landmarks.landmark[20].y)],
            face_box[0], face_box[1]
        )

        # Check if hands are crossed
        cross_hand = cross_hand_check(results.pose_landmarks.landmark[15].x, results.pose_landmarks.landmark[16].x)

        # Compute hand width
        if results.pose_landmarks.landmark[15].visibility > 0.1 and results.pose_landmarks.landmark[16].visibility > 0.1:
            hand_distance = results.pose_landmarks.landmark[15].x - results.pose_landmarks.landmark[16].x
        else:
            hand_distance = -999

        # Compute shoulder width
        if results.pose_landmarks.landmark[11].visibility > 0.1 and results.pose_landmarks.landmark[12].visibility > 0.1:
            shoulder_distance = results.pose_landmarks.landmark[11].x - results.pose_landmarks.landmark[12].x
        else:
            shoulder_distance = -999

        # Compute hand width to shoulder width ratio
        handWidth_to_shoulderWidth = hand_distance / shoulder_distance if hand_distance != -999 and shoulder_distance != -999 else -999

        # Compute left hand height ratio
        if results.pose_landmarks.landmark[23].visibility > 0.1 and results.pose_landmarks.landmark[11].visibility > 0.1:
            shoulder_hip_distance_left = results.pose_landmarks.landmark[23].y - results.pose_landmarks.landmark[11].y
            if results.pose_landmarks.landmark[15].visibility > 0.1:
                left_hand_to_shoulder = results.pose_landmarks.landmark[15].y - results.pose_landmarks.landmark[11].y
                left_hand_height_ratio = left_hand_to_shoulder / shoulder_hip_distance_left
            else:
                left_hand_height_ratio = -999
        else:
            left_hand_height_ratio = -999

        # Compute right hand height ratio
        if results.pose_landmarks.landmark[24].visibility > 0.1 and results.pose_landmarks.landmark[12].visibility > 0.1:
            shoulder_hip_distance_right = results.pose_landmarks.landmark[24].y - results.pose_landmarks.landmark[12].y
            if results.pose_landmarks.landmark[16].visibility > 0.1:
                right_hand_to_shoulder = results.pose_landmarks.landmark[16].y - results.pose_landmarks.landmark[12].y
                right_hand_height_ratio = right_hand_to_shoulder / shoulder_hip_distance_right
            else:
                right_hand_height_ratio = -999
        else:
            right_hand_height_ratio = -999

        # Compute hand gesture
        if recognition_result.hand_landmarks:
            top_gesture = recognition_result.gestures[0][0].category_name
        else:
            top_gesture = 'none'

        # Check if the body language is open or not
        if handWidth_to_shoulderWidth != -999 and left_hand_height_ratio != -999 and right_hand_height_ratio != -999:
            open_pose = 1 if handWidth_to_shoulderWidth > 1 and left_hand_height_ratio < 1 and right_hand_height_ratio < 1 else 0
        else:
            open_pose = -999

        # Convert BGR to RGB
        final_frame = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)

        # Save the processed image to the output directory
        # output_path = os.path.join(cropped_images_dir, filename)
        # cv2.imwrite(output_path, image_bgr)
        # print(f"Processed and saved: {output_path}")

        # Save to CSV
        row_to_add = {
            "id": filename,
            "left_wrist_X": results.pose_landmarks.landmark[15].x,
            "left_wrist_Y": results.pose_landmarks.landmark[15].y,
            "right_wrist_X": results.pose_landmarks.landmark[16].x,
            "right_wrist_Y": results.pose_landmarks.landmark[16].y,
            "crossing_arms": cross_hand,
            "hand_open": top_gesture,
            "touching_face": touching_face,
            "hand_width": handWidth_to_shoulderWidth,
            "left_hand_height": left_hand_height_ratio,
            "right_hand_height": right_hand_height_ratio,
            "open_pose": open_pose
        }
        return row_to_add


In [None]:
import os
import cv2
import pandas as pd
import numpy as np
from ultralytics import YOLO

# Load the Haar cascade for face detection
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")

# Define the directory to save cropped images
save_dir = "/content/drive/My Drive/Master thesis/Code/BodyDataCNN/OUTPUT"

# Initialize the YOLO model
model = YOLO('yolov8m.pt')

# Load class list from coco.txt
with open("/content/drive/My Drive/Master thesis/Code/BodyExtraction/coco/coco.txt", "r") as my_file:
    class_list = my_file.read().split("\n")

# Function to save the cropped image
def save_cropped_image(img, save_dir, filename):
    if save_dir is not None:
        os.makedirs(save_dir, exist_ok=True)
    img_file_name = os.path.join(save_dir, filename)
    cv2.imwrite(img_file_name, img)

# Function to process image using YOLO model
def process_image_yolo(img, file_name, frame_num, scale_factor=0.5):
    # Resize the frame for faster processing
    frame = cv2.resize(img, (0, 0), fx=scale_factor, fy=scale_factor)
    results = model.predict(frame, verbose=False)

    boxes = pd.DataFrame(results[0].boxes.data).astype("float")

    face_list = []
    crop_list = []

    for index, row in boxes.iterrows():
        x1, y1, x2, y2, conf, d = row
        c = class_list[int(d)]
        confidence_threshold = 0.8
        if 'person' in c and conf >= confidence_threshold:
            # Expand the bounding box slightly
            expand_by = 0.1
            width = x2 - x1
            height = y2 - y1
            x1_new = max(int(x1 - expand_by * width), 0)
            y1_new = max(int(y1 - expand_by * height), 0)
            x2_new = min(int(x2 + expand_by * width), frame.shape[1])
            y2_new = min(int(y2 + expand_by * height), frame.shape[0])

            crop = frame[y1_new:y2_new, x1_new:x2_new]
            crop_list.append(crop)

            # Convert crop to grayscale and detect faces
            gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)
            faces = face_cascade.detectMultiScale(gray, 1.3, 5)

            if faces is not ():
                face_list.append(faces[0])

    # Compute face areas
    face_areas = [(w * h, (x, y, w, h)) for (x, y, w, h) in face_list]

    # Function to find the index of the maximum face area
    def find_max_first_value_index(data):
        max_value = -float('inf')
        max_index = -1
        for index, (first_value, _) in enumerate(data):
            if first_value > max_value:
                max_value = first_value
                max_index = index
        return max_index

    index = find_max_first_value_index(face_areas)
    row_to_add = None

    # Select the YOLO crop
    if crop_list:
        if index < len(crop_list):
            frame_rgb = cv2.cvtColor(crop_list[index], cv2.COLOR_BGR2RGB)
            row_to_add = map_body_pose(frame_rgb, file_name + "_F" + str(frame_num) + ".png")
        else:
            frame_rgb = cv2.cvtColor(crop_list[0], cv2.COLOR_BGR2RGB)
            row_to_add = map_body_pose(frame_rgb, file_name + "_F" + str(frame_num) + ".png")

    return row_to_add


In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import csv
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from google.colab.patches import cv2_imshow

# Initialize an empty DataFrame to hold the data
df = pd.DataFrame(columns=[
    "id", "left_wrist_X", "left_wrist_Y", "right_wrist_X", "right_wrist_Y",
    "crossing_arms", "hand_open", "touching_face", "hand_width",
    "left_hand_height", "right_hand_height"
])

# Path to the input frames directory
input_dir = '/content/drive/My Drive/Master thesis/Code/face_and_body_extraction/pitches_without_sharks'

# Process each file in the input directory
for filename in os.listdir(input_dir):
    # Read the CSV file to check for blocklisted files
    blocklist_path = '/content/drive/My Drive/Master thesis/Code/BodyDataCNN/blocklist.csv'
    blocklist_df = pd.read_csv(blocklist_path, header=None)
    existing_filenames = blocklist_df.iloc[:, 0].tolist()  # Assuming filenames are in the first column

    # Skip the file if it is in the blocklist
    if filename in existing_filenames:
        continue

    video_path = os.path.join(input_dir, filename)
    cap = cv2.VideoCapture(video_path)

    # Extract pitch name from the filename
    position_of_f = filename.find('_')
    pitch_name = filename[:position_of_f]

    # Initialize list to hold data for the current video
    data_array = []

    frame_index = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break  # Exit loop if no frames are left

        # Process the frame with YOLO and map body pose
        row_to_add = process_image_yolo(img=frame, file_name=pitch_name, frame_num=frame_index)

        if row_to_add is not None:
            data_array.append(row_to_add)

        frame_index += 1  # Increment the frame index

    cap.release()  # Release the video capture object

    # Create a new CSV file for the current video data
    create_csv(pitch_name, data_array)

    # Append the processed filename to the blocklist
    with open(blocklist_path, 'a', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow([filename])


## Vocal Analysis Pipeline

In [None]:
import os
import pandas as pd
import parselmouth
from parselmouth.praat import call  # pylint: disable=no-name-in-module, import-error
from config import Config
import glob
import numpy as np
import sys
from sklearn.preprocessing import minmax_scale

def extract_features(filenames):
    """
    Extracts audio features from a list of files and saves the results to CSV files.

    Args:
        filenames (list): List of file paths to process.

    Returns:
        output_df (DataFrame): DataFrame containing extracted features.
        ipu_df (DataFrame): DataFrame containing IPU (inter-pausal unit) information.
    """
    output_df = pd.DataFrame(columns=[
        'filename', 'Min Pitch', 'Max Pitch', 'Mean Pitch', 'Sd Pitch',
        'Min Intensity', 'Max Intensity', 'Mean Intensity', 'Sd Intensity',
        'Jitter', 'Shimmer', 'HNR', 'Energy'
    ])
    ipu_df = pd.DataFrame(columns=['filename', 'start_time', 'end_time'])

    for count, fn in enumerate(filenames, 1):
        if count % 100 == 0:
            print('Completed:', count)

        try:
            sound = parselmouth.Sound(fn)
            pitch = call(sound, 'To Pitch', 0.0, 75.0, 600.0)
            intensity = call(sound, 'To Intensity', 75.0, 0.0, True)
            point_process = call(sound, 'To PointProcess (periodic, cc)', 75.0, 600.0)
            harmonicity = call(sound, 'To Harmonicity (cc)', 0.01, 75, 0.1, 1.0)
            silence = call(sound, "To TextGrid (silences)", 100, 0.0, -25.0, 0.05, 0.1, "silent", "sounding")
            
            num_intervals = call(silence, "Get number of intervals", 1)
            start_times = [
                call(silence, "Get start time of interval", 1, i)
                for i in range(1, num_intervals + 1)
                if call(silence, "Get label of interval", 1, i) == "sounding"
            ]
            end_times = [
                call(silence, "Get end time of interval", 1, i)
                for i in range(1, num_intervals + 1)
                if call(silence, "Get label of interval", 1, i) == "sounding"
            ]

            ipu_df = pd.concat([ipu_df, pd.DataFrame([[os.path.basename(fn), start_times, end_times]], columns=ipu_df.columns)], ignore_index=True)

            features = {
                'min_pitches': [], 'max_pitches': [], 'mean_pitches': [], 'sd_pitches': [],
                'min_intensities': [], 'max_intensities': [], 'mean_intensities': [], 'sd_intensities': [],
                'jitter_list': [], 'shimmer_list': [], 'hnr_list': [], 'energy_list': []
            }

            for t1, t2 in zip(start_times, end_times):
                # Extract pitch features
                features['min_pitches'].append(call(pitch, 'Get minimum', t1, t2, 'Hertz', 'Parabolic'))
                features['max_pitches'].append(call(pitch, 'Get maximum', t1, t2, 'Hertz', 'Parabolic'))
                features['mean_pitches'].append(call(pitch, 'Get mean', t1, t2, 'Hertz'))
                features['sd_pitches'].append(call(pitch, 'Get standard deviation', t1, t2, 'Hertz'))

                # Extract intensity features
                features['min_intensities'].append(call(intensity, 'Get minimum', t1, t2, 'Parabolic'))
                features['max_intensities'].append(call(intensity, 'Get maximum', t1, t2, 'Parabolic'))
                features['mean_intensities'].append(call(intensity, 'Get mean', t1, t2, 'energy'))
                features['sd_intensities'].append(call(intensity, 'Get standard deviation', t1, t2))

                # Extract jitter and shimmer
                features['jitter_list'].append(call(point_process, 'Get jitter (local)', t1, t2, 0.0001, 0.02, 1.3))
                features['shimmer_list'].append(call([sound, point_process], 'Get shimmer (local)', t1, t2, 0.0001, 0.02, 1.3, 1.6))

                # Extract HNR
                features['hnr_list'].append(call(harmonicity, "Get mean", t1, t2))

                # Extract energy
                features['energy_list'].append(call(sound, 'Get energy', t1, t2))

            for key in features:
                features[key] = np.array(features[key])
                features[key] = features[key][~np.isnan(features[key])]

            # Calculate mean values of features
            feature_means = {key: features[key].mean() if len(features[key]) > 0 else np.nan for key in features}

            new_row = pd.DataFrame([[
                os.path.basename(fn), feature_means['min_pitches'], feature_means['max_pitches'],
                feature_means['mean_pitches'], feature_means['sd_pitches'], feature_means['min_intensities'],
                feature_means['max_intensities'], feature_means['mean_intensities'], feature_means['sd_intensities'],
                feature_means['jitter_list'], feature_means['shimmer_list'], feature_means['hnr_list'],
                feature_means['energy_list']
            ]], columns=output_df.columns)

            output_df = pd.concat([output_df, new_row], ignore_index=True)

        except:
            print(sys.exc_info())
            continue

    # Save the results to CSV files
    output_df.to_csv("/content/drive/MyDrive/Master thesis/Code/praat5.csv", index=None, header=True)
    ipu_df.to_csv("/content/drive/MyDrive/Master thesis/Code/ipu5.csv", index=None, header=True)

    return output_df, ipu_df

# Example usage:
# myargs = glob.glob("drive/My Drive/align_speech/*.wav")
# df, ipu_df = extract_features(myargs)


## Descriptive analysis plots:

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Load the data from the specified Excel file and sheet
file_path = 'path/to/Data.xlsx'
sheet_name = 'Cleaned'

# Read the data into a pandas DataFrame
data = pd.read_excel(file_path, sheet_name=sheet_name)

# Calculate summary statistics
summary_stats = data.describe()

# Plot settings
sns.set_theme(style="whitegrid")

# Plot 1: Age
plt.figure(figsize=(10, 6))
sns.kdeplot(data=data, x='Age', fill=True, color='skyblue', edgecolor='steelblue')
plt.axvline(data['Age'].mean(), color='red', linestyle='--', linewidth=1)
plt.xlabel('Age of Entrepreneur')
plt.ylabel('Density')
plt.title('Age Distribution')
plt.show()

# Plot 2: Valuation Requested
plt.figure(figsize=(10, 6))
sns.kdeplot(data=data, x='Valuation Requested', fill=True, color='skyblue', edgecolor='steelblue')
plt.axvline(data['Valuation Requested'].mean(), color='red', linestyle='--', linewidth=1)
plt.xlabel('Valuation Requested')
plt.ylabel('Density')
plt.title('Valuation Requested Distribution')
plt.show()

# Plot 3: Original Ask Amount
plt.figure(figsize=(10, 6))
sns.kdeplot(data=data, x='Original Ask Amount', fill=True, color='skyblue', edgecolor='steelblue')
plt.axvline(data['Original Ask Amount'].mean(), color='red', linestyle='--', linewidth=1)
plt.xlabel('Original Ask Amount')
plt.ylabel('Density')
plt.title('Original Ask Amount Distribution')
plt.show()

# Plot 4: Ethnicity
ethnicity_columns = ['Asian', 'black', 'indian', 'latino hispanic', 'middle eastern', 'white']
data['Ethnicity'] = data[ethnicity_columns].idxmax(axis=1)
ethnicity_deal_counts = data.groupby(['Ethnicity', 'DEAL']).size().unstack().fillna(0)
ethnicity_deal_counts['Total'] = ethnicity_deal_counts.sum(axis=1)
ethnicity_deal_counts = ethnicity_deal_counts.sort_values(by='Total', ascending=False).drop(columns='Total')

plt.figure(figsize=(10, 6))
ethnicity_deal_counts.plot(kind='barh', stacked=True, color=['skyblue', 'steelblue'])
plt.xlabel('Count')
plt.ylabel('Ethnicity')
plt.title('Ethnicity Deal Counts')
plt.show()

# Plot 5: Gender Distribution
def determine_gender(row):
    if row['Male'] == 1 and row['Female'] == 0:
        return 'Male'
    elif row['Male'] == 0 and row['Female'] == 1:
        return 'Female'
    else:
        return 'Mixed Gender'

data['Gender Distribution'] = data.apply(determine_gender, axis=1)
gender_deal_counts = data.groupby(['Gender Distribution', 'DEAL']).size().unstack().fillna(0)
gender_deal_counts['Total'] = gender_deal_counts.sum(axis=1)
gender_deal_counts = gender_deal_counts.sort_values(by='Total', ascending=False).drop(columns='Total')

plt.figure(figsize=(10, 6))
gender_deal_counts.plot(kind='barh', stacked=True, color=['skyblue', 'steelblue'])
plt.xlabel('Count')
plt.ylabel('Gender Distribution')
plt.title('Gender Distribution Deal Counts')
plt.show()

# Plot 6: Industry
industry_mapping = {
    1: 'Lifestyle/Home',
    2: 'Electronics',
    3: 'Food and Beverage',
    4: 'Children/Education',
    5: 'Fashion/Beauty',
    6: 'Fitness/Sports/Outdoors',
    7: 'Uncertain/Other',
    8: 'Software/Tech',
    9: 'Health/Wellness',
    10: 'Automotive',
    11: 'Pet Products',
    12: 'Media/Entertainment',
    13: 'Business Services',
    14: 'Liquor/Alcohol',
    15: 'Travel',
    16: 'Green/CleanTech'
}

data['Industry'] = data['Industry'].map(industry_mapping)
industry_deal_counts = data.groupby(['Industry', 'DEAL']).size().unstack().fillna(0)
industry_deal_counts['Total'] = industry_deal_counts.sum(axis=1)
industry_deal_counts = industry_deal_counts.sort_values(by='Total', ascending=False).drop(columns='Total')

plt.figure(figsize=(10, 6))
industry_deal_counts.plot(kind='barh', stacked=True, color=['skyblue', 'steelblue'])
plt.xlabel('Count')
plt.ylabel('Industry')
plt.title('Industry Deal Counts')
plt.show()


## Feature Engineering

In [None]:
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
import numpy as np
import seaborn as sns

# Load the preprocessed data from the Excel file
file_path = '/content/drive/My Drive/Master thesis/Code/FINAL_DATA_FOR_THESIS/VIF.xlsx'
sheet_name = 'VIF Data'

# Load the data, ensuring all columns are float except "ID" and "Pitchers Gender"
df = pd.read_excel(file_path, sheet_name=sheet_name)



#FEATURE ENGINEERING


#Scale all features
features_to_center = df.columns.drop(["Deal", 'ID', 'Male', 'Female', 'Ethnicity Asian', 'Ethnicity Black', 'Ethnicity Indian', 'Ethnicity Latino Hispanic', 'Ethnicity Middle Eastern', 'Ethnicity White', 'Industry', '# Of Entrepreneurs'])
scaler = StandardScaler(with_std=False)
df_centered = pd.DataFrame(scaler.fit_transform(df[features_to_center]), columns=(features_to_center))
df_centered = pd.concat([df_centered, df[["Deal", 'ID', 'Male', 'Female', 'Ethnicity Asian', 'Ethnicity Black', 'Ethnicity Indian', 'Ethnicity Latino Hispanic', 'Ethnicity Middle Eastern', 'Ethnicity White', 'Industry', '# Of Entrepreneurs']]], axis=1)


#Aggregation  for dimnsionality reduction and interpratibility
df_centered['Hand avg velocity Y axis'] = (df_centered['Avg Velocity Right Hand Y'] +  df_centered['Avg Velocity Left Hand Y'] ) / 2
df_centered['Hand avg velocity X axis'] = (df_centered['Avg Velocity Right Hand X'] +  df_centered['Avg Velocity Left Hand X'] ) / 2
df_centered['Hand avg std velocity'] = (df_centered['STD Velocity Right Hand Y'] + df_centered['STD Velocity Left Hand Y'] + df_centered['STD Velocity Right Hand X'] + df_centered['STD Velocity Left Hand X']) / 4
df_centered['Hand velocity magnitude'] = np.sqrt((df_centered['Avg Velocity Right Hand Y'] + df_centered['Avg Velocity Left Hand Y'])**2 + (df_centered['Avg Velocity Right Hand X'] + df_centered['Avg Velocity Left Hand X'])**2)


df_centered['Gesture'] = df_centered['Open Palm'] + df_centered['Pointing Up'] + df_centered['Thumb Down'] + df_centered['Thumb Up'] + df_centered['Closed Fist']

#df_centered['hand_height'] = (df_centered['right_hand_height'] + df_centered['left_hand_height'] ) / 2
#df_centered['hand*height'] = (df_centered['right_hand_height'] * df_centered['left_hand_height'] )


#Aggregation for improving performance
df_centered['Face Negative'] = df_centered['Face Angry'] + df_centered['Face Disgust'] + df_centered['Face Fear'] + df_centered['Face Sad']
df_centered['Face Positive'] = df_centered['Face Happy'] + df_centered['Face Surprise']


df_centered['Finance Valence'] = df_centered['Verbal Pos-Finance'] - df_centered['Verbal Neg-Finance']
df_centered['Hapiness Valence'] = df_centered['Verbal Pos-Happiness'] - df_centered['Verbal Neg-Happiness']
df_centered['Sociability Valence'] = df_centered['Verbal Pos-Sociability'] - df_centered['Verbal Neg-Sociability']
df_centered['Morality Valence'] = df_centered['Verbal Pos-Morality'] - df_centered['Verbal Neg-Morality']
df_centered['Ability Valence'] = df_centered['Verbal Pos-Ability'] - df_centered['Verbal Neg-Ability']
df_centered['Agency Valence'] = df_centered['Verbal Pos-Agency'] - df_centered['Verbal Neg-Agency']









#Interaction terms (White becomes a dummy by dropping every other race)

df_centered['Jitter * Shimmer'] = df_centered['Jitter'] * df_centered['Shimmer']
df_centered['Face Positive * Open Pose'] = df_centered['Face Positive'] * df_centered['Open Pose']
df_centered['Jitter * WPM'] = df_centered['Jitter'] * df_centered['WPM']



#Interplay between modalities
df_centered['Hand velocity magnitude * HNR'] = df_centered['Hand velocity magnitude'] * df_centered['HNR']
df_centered['Mean Intensity * Face Happy'] = df_centered['Mean Intensity'] * df_centered['Face Positive']
df_centered['Open Pose * HNR'] = df_centered['Open Pose'] * df_centered['HNR']
df_centered['Female * Mean Intensity'] = df_centered['Female'] * df_centered['Mean Intensity']
df_centered['Face Positive * WPM'] = df_centered['Face Positive'] * df_centered['WPM']
df_centered['Face Positive * Sociability Valence'] = df_centered['Face Positive'] * df_centered['Sociability Valence']
df_centered['Hapiness Valence * Face Positive'] = df_centered['Hapiness Valence'] * df_centered['Face Positive'] ##!!!!!! IMPORTANT   MEHRHABIan 1971
df_centered['Sociability Valence * Open Pose'] = df_centered['Sociability Valence'] * df_centered['Open Pose']
df_centered['Finance Valence * Energy'] = df_centered['Finance Valence'] * df_centered['Energy']
df_centered['WPM * Sentence Count'] = df_centered['WPM'] * df_centered['Sentence Count']
df_centered['Face Negative * Verbal Certainty'] = df_centered['Face Negative'] * df_centered['Verbal Certainty']


df_centered['Female * Face Positive'] = df_centered['Female'] * df_centered['Face Positive']
df_centered['Female * Gesture'] = df_centered['Female'] * df_centered['Gesture']

#Body, Face, Verbal, Vocal, Metadata




#Drop columns that we have aggergated or that we dont need
df_centered.drop(['ID',

        'Avg Velocity Right Hand Y', 'Avg Velocity Left Hand Y', 'Avg Velocity Right Hand X', 'Avg Velocity Left Hand X', 'STD Velocity Right Hand Y', 'STD Velocity Left Hand Y', 'STD Velocity Right Hand X', 'STD Velocity Left Hand X',

        'Open Palm', 'Pointing Up', 'Thumb Down', 'Thumb Up','Closed Fist',

         'Verbal Pos-Finance', 'Verbal Neg-Finance', 'Verbal Pos-Happiness', 'Verbal Neg-Happiness', 'Verbal Pos-Sociability', 'Verbal Neg-Sociability',
         'Verbal Pos-Morality', 'Verbal Neg-Morality', 'Verbal Pos-Ability', 'Verbal Neg-Ability', 'Verbal Pos-Agency', 'Verbal Neg-Agency',

        'Ethnicity Asian', 'Ethnicity Black', 'Ethnicity Indian', 'Ethnicity Latino Hispanic', 'Ethnicity Middle Eastern',

        'Ask Amount', 'Offered Equity'


         #Dropped due to scaling
         #'Face Positive',
         #'Verbal Pos-Sociability',
         #'Jitter',
         #'Shimmer',
         #'Open Pose',
         #'WPM',
         #'Mean Pitch',
         #'HNR',
         #'Hand velocity magnitude',
         #'Mean Intensity'



        ], axis=1, inplace=True)

#Dropped columns due to multicollinearity
df_centered.drop([

        'Face Neutral', 'Face Angry', 'Face Disgust', 'Face Fear', 'Face Sad', 'Face Happy', 'Face Surprise', #These were not due to multicollinearity but the below are
        'Min Intensity',
        'Max Intensity',
          'Max Pitch',
        'Min Pitch'
        ], axis=1, inplace=True)

rows = df_centered[df_centered.isna().any(axis=1)]
print(rows)

X = df_centered.drop(columns=['Deal'])
y = df_centered['Deal']

## VIF Test

In [None]:
import pandas as pd
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor

def calculate_vif(X, output_csv_path):
    # Add a constant to the independent variables
    X = sm.add_constant(X)

    # Calculate VIF for each feature
    vif_data = pd.DataFrame()
    vif_data['Feature'] = X.columns
    vif_data['VIF'] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
    vif_data['Above 10'] = vif_data['VIF'] > 10

    # Save the VIF values to a CSV file
    vif_data.to_csv(output_csv_path, index=False)

    # Display the VIF values in a table format
    print("\nVariance Inflation Factor (VIF) Table:")
    print(vif_data)

    # Drop the constant column for further analysis if needed
    X = X.drop(columns='const')
    return X

# Example usage
data_path = 'path/to/your/data.csv'
output_csv_path = 'path/to/output_vif_values.csv'

# Load your data
data = pd.read_csv(data_path)

# Assuming 'X' is your dataframe with independent variables
X = data.drop(columns=['target_variable'])  # Replace 'target_variable' with your dependent variable

# Calculate VIF
X = calculate_vif(X, output_csv_path)


## Correlation Matrix

### With deal

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

def plot_correlation_with_deal(df, target_column, output_figure_path=None):
    # Calculate the correlation matrix and sort the correlations with the target column
    correlation_matrix = df.corr()[target_column].drop(labels=[target_column]).sort_values(ascending=False)

    # Plot settings
    plt.figure(figsize=(12, 8))
    sns.barplot(x=correlation_matrix.index, y=correlation_matrix.values, palette="coolwarm", edgecolor='k')
    plt.title('Correlation with DEAL (Y/N)')
    plt.xlabel('Columns')
    plt.ylabel('Correlation with DEAL (Y/N)')
    plt.xticks(rotation=90, ha='right')
    plt.tight_layout()

    # Show or save the plot
    if output_figure_path:
        plt.savefig(output_figure_path)
    else:
        plt.show()

# Example usage
data_path = 'path/to/your/data.csv'
output_figure_path = 'path/to/output_figure.png'  # Optional, set to None if you don't want to save the plot

# Load your data
df = pd.read_csv(data_path)

# Assuming 'Deal' is your target column
target_column = 'Deal'

# Plot correlation with Deal
plot_correlation_with_deal(df, target_column, output_figure_path)


### All features

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

def plot_custom_correlation_matrix(df, custom_order, output_figure_path=None):
    # Compute the correlation matrix
    correlation_matrix = df.corr()

    # Identify columns without interaction terms
    non_interaction_columns = [col for col in correlation_matrix.columns if '*' not in col and ':' not in col]

    # Filter the correlation matrix to remove interaction terms and reorder based on custom order
    filtered_correlation_matrix = correlation_matrix.loc[non_interaction_columns, non_interaction_columns]

    # Verify that all custom order columns are in the non-interaction columns
    custom_order_filtered = [col for col in custom_order if col in non_interaction_columns]

    # Reorder the filtered correlation matrix based on the custom order
    filtered_correlation_matrix = filtered_correlation_matrix.loc[custom_order_filtered, custom_order_filtered]

    # Create a mask for the upper triangle
    mask = np.triu(np.ones_like(filtered_correlation_matrix, dtype=bool))

    # Set up the matplotlib figure
    plt.figure(figsize=(20, 16))

    # Generate a custom diverging colormap
    cmap = sns.diverging_palette(220, 20, as_cmap=True)

    # Draw the heatmap with the mask and correct aspect ratio
    sns.heatmap(filtered_correlation_matrix, mask=mask, cmap=cmap, annot=True, fmt=".2f",
                square=True, linewidths=.5, cbar_kws={"shrink": .8}, vmin=-1, vmax=1, annot_kws={"size": 9})

    # Set the title
    plt.title('Correlation Matrix', fontsize=20)

    # Adjust the font size of the x and y axis labels
    plt.xticks(fontsize=10)
    plt.yticks(fontsize=10)

    # Increase the spacing between the heatmap elements
    plt.tight_layout()

    # Save the figure (optional)
    if output_figure_path:
        plt.savefig(output_figure_path, dpi=300, bbox_inches='tight')

    # Show the plot
    plt.show()

# Example usage
data_path = 'path/to/your/data.csv'
custom_order = [
    # Verbal features
    'Verbal Uncertainty', 'Verbal Certainty', 'Finance Valence',
    'Verbal Pos-Ability', 'Verbal Pos-Agency', 'Word Count', 'Sentence Count', 'Hapiness Valence', 'Sociability Valence', 'Morality Valence', 'Ability Valence', 'Agency Valence',
    # Face-related features
    'Face Positive', 'Face Negative',
    # Body-related features
    'Crossing Arms', 'Hand Distance', 'Left Hand Height', 'Right Hand Height', 'Open Pose',
    'Hand avg velocity Y axis', 'Hand avg velocity X axis', 'Hand avg std velocity', 'Hand velocity magnitude', 'Gesture',
    # Vocal features
    'Mean Pitch', 'STD Pitch', 'Mean Intensity', 'STD Intensity', 'Jitter', 'Shimmer', 'HNR', 'Energy', 'WPM',
    # Metadata
    'Male', 'Female', 'Deal', 'Industry', '# Of Entrepreneurs', 'Valuation Requested', 'Age', "Ethnicity White"
]

# Load your data
df = pd.read_csv(data_path)

# Plot the custom correlation matrix
output_figure_path = 'path/to/output_figure.png'  # Optional, set to None if you don't want to save the plot
plot_custom_correlation_matrix(df, custom_order, output_figure_path)

## Feature Selection

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier

def train_random_forest_and_plot_importance(X, y, top_n=12, correlation_threshold=0.022):
    # Train a Random Forest model
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(X, y)

    # Get feature importances
    importances = rf.feature_importances_
    feature_names = X.columns
    feature_importances = pd.Series(importances, index=feature_names)

    # Plot feature importances
    plt.figure(figsize=(10, 12))  # Increase figure size
    feature_importances.sort_values().plot(kind='barh', color='skyblue')
    plt.title('Feature Importances')
    plt.xlabel('Importance')
    plt.ylabel('Features')
    plt.xticks(rotation=45)  # Rotate x-ticks if necessary
    plt.tight_layout()  # Adjust layout to fit everything nicely
    plt.show()

    # Calculate the correlation coefficients between each feature and the dependent variable
    correlations = X.corrwith(y)

    # Plot the correlations
    plt.figure(figsize=(10, 12))  # Increase figure size
    correlations.sort_values().plot(kind='barh', color='salmon')
    plt.title('Feature Correlations with Target Variable')
    plt.xlabel('Correlation')
    plt.ylabel('Features')
    plt.xticks(rotation=45)  # Rotate x-ticks if necessary
    plt.tight_layout()  # Adjust layout to fit everything nicely
    plt.show()

    # Select features with correlation above a certain threshold
    correlation_selected_features = correlations[(abs(correlations) > correlation_threshold)].index

    # Select top N features based on feature importance
    top_features = feature_importances.nlargest(top_n).index

    # Combine the selected features from both methods
    selected_features = list(set(correlation_selected_features).union(set(top_features)))

    print(f"Selected features: {selected_features}")
    return selected_features

# Example usage
data_path = 'path/to/your/data.csv'

# Load your data
data = pd.read_csv(data_path)

# Assuming 'target' is your dependent variable and others are independent variables
X = data.drop(columns=['target'])
y = data['target']

# Train Random Forest and plot feature importance
selected_features = train_random_forest_and_plot_importance(X, y)


## Final ML Model

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
from sklearn.metrics import make_scorer, f1_score, classification_report, accuracy_score, roc_auc_score, confusion_matrix, roc_curve, precision_score, recall_score
from imblearn.over_sampling import SMOTE
import random

from collections import Counter
from sklearn.datasets import make_classification
from matplotlib import pyplot
from numpy import where

X_selected = X[selected_features]
# Subset the data to include only selected features
X_audio = X_selected[['HNR', 'Mean Intensity', 'STD Pitch', 'Jitter', 'Energy', 'Shimmer', 'Mean Pitch', 'WPM', 'Jitter * Shimmer', 'STD Intensity']]
X_verbal = X_selected[['Finance Valence', 'Verbal Uncertainty', 'Agency Valence', 'Word Count', 'Hapiness Valence', 'Ability Valence', 'Sociability Valence', 'Sentence Count']]
X_visual = X_selected[['Hand avg velocity X axis', 'Face Negative', 'Hand Distance', 'Right Hand Height', 'Hand avg std velocity', 'Gesture', 'Face Positive']]

############################################################ Combined ############################################################

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_selected, y, test_size=0.3, random_state=42, stratify=y)

# Define parameter grids for each model
param_grids = {
    'xgboost': {
        'n_estimators': [50, 150, 250],
        'max_depth': [3, 5, 7, 9],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9],
        'colsample_bytree': [0.5, 0.7, 0.9]
    },
    'random_forest': {
        'n_estimators': [25, 50, 100, 200, 300],
        'max_depth': [None, 2, 5, 10, 20],
        'min_samples_split': [2, 4, 6, 8],
        'min_samples_leaf': [1, 2, 4]
    },
    'logistic_regression': {
        'C': [0.01, 0.1, 1, 10, 100, 200, 300, 400, 500],
        'penalty': ['l1', 'l2'],
        'solver': ['liblinear'],
        'class_weight': ['balanced']
    },
    'gradient_boosting': {
        'n_estimators': [50, 100, 200, 300],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.005, 0.01, 0.1, 0.3],
        'subsample': [0.5, 0.7, 0.8, 0.9]
    },
    'decision_tree': {
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    }
}

# Initialize models
models = {
    'xgboost': xgb.XGBClassifier(objective='binary:logistic', use_label_encoder=False, tree_method='gpu_hist', predictor='gpu_predictor'),
    'random_forest': RandomForestClassifier(),
    'logistic_regression': LogisticRegression(),
    'gradient_boosting': GradientBoostingClassifier(),
    'decision_tree': DecisionTreeClassifier(),
}

# Dictionary to store the best models
best_models = {}

# DataFrame to store the evaluation results
results = pd.DataFrame(columns=['Model', 'Stage', 'Accuracy', 'F1 Score', 'AUC/ROC', 'Precision', 'Recall'])

def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else model.decision_function(X_test)
    f1 = f1_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    auc_roc = roc_auc_score(y_test, y_prob)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)
    return {'Accuracy': accuracy, 'F1 Score': f1, 'AUC/ROC': auc_roc, 'Precision': precision, 'Recall': recall, 'Confusion Matrix': cm}

# Evaluate each model before hyperparameter tuning
for model_name, model in models.items():
    print(f"\nEvaluating {model_name} before tuning...")
    model.fit(X_train, y_train)
    metrics = evaluate_model(model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'Before Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} before tuning:\n{metrics['Confusion Matrix']}")

cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)

# Perform grid search for each model
for model_name, model in models.items():
    print(f"\nTraining {model_name} with GridSearchCV...")
    grid_search = GridSearchCV(estimator=model, param_grid=param_grids[model_name], scoring="accuracy", cv=cv, n_jobs=-1, verbose=2)
    grid_search.fit(X_train, y_train)
    best_models[model_name] = grid_search.best_estimator_
    print(f"Best parameters for {model_name}: {grid_search.best_params_}")

# Evaluate each best model on the test set after hyperparameter tuning
for model_name, best_model in best_models.items():
    print(f"\nEvaluating {model_name} after tuning...")
    metrics = evaluate_model(best_model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'After Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} after tuning:\n{metrics['Confusion Matrix']}")

# Display the final results
print(results)


############################################################ VISUAL ############################################################

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_visual, y, test_size=0.2, random_state=42)

# Create a custom scorer for F1-Macro
f1_macro_scorer = make_scorer(f1_score, average='macro')

# Define parameter grids for each model
param_grids = {
    'xgboost': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9],
        'colsample_bytree': [0.7, 0.8, 0.9]
    },
    'random_forest': {
        'n_estimators': [100, 200],
        'max_depth': [None, 10, 20],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'decision_tree': {
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'logistic_regression': {
        'C': [0.01, 0.1, 1, 10, 100],
        'penalty': ['l1', 'l2'],
        'solver': ['liblinear']
    },
    'gradient_boosting': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9]
    },
    'svm': {
        'C': [0.1, 1, 10],
        'gamma': ['scale', 'auto'],
        'kernel': ['rbf', 'linear']
    }
}

# Initialize models
models = {
    'xgboost': xgb.XGBClassifier(objective='binary:logistic', use_label_encoder=False, tree_method='gpu_hist', predictor='gpu_predictor'),
    'random_forest': RandomForestClassifier(),
    'decision_tree': DecisionTreeClassifier(),
    'logistic_regression': LogisticRegression(),
    'gradient_boosting': GradientBoostingClassifier(),
}

# Dictionary to store the best models
best_models = {}

# DataFrame to store the evaluation results
results = pd.DataFrame(columns=['Model', 'Stage', 'Accuracy', 'F1 Score', 'AUC/ROC', 'Precision', 'Recall'])

def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else model.decision_function(X_test)
    f1 = f1_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    auc_roc = roc_auc_score(y_test, y_prob)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)
    return {'Accuracy': accuracy, 'F1 Score': f1, 'AUC/ROC': auc_roc, 'Precision': precision, 'Recall': recall, 'Confusion Matrix': cm}



# Evaluate each model before hyperparameter tuning
for model_name, model in models.items():
    print(f"\nEvaluating {model_name} before tuning...")
    model.fit(X_train, y_train)
    metrics = evaluate_model(model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'Before Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} before tuning:\n{metrics['Confusion Matrix']}")

# Perform grid search for each model
for model_name, model in models.items():
    print(f"\nTraining {model_name} with GridSearchCV...")
    grid_search = GridSearchCV(estimator=model, param_grid=param_grids[model_name], scoring=f1_macro_scorer, cv=5, n_jobs=-1, verbose=2)
    grid_search.fit(X_train, y_train)
    best_models[model_name] = grid_search.best_estimator_
    print(f"Best parameters for {model_name}: {grid_search.best_params_}")

# Evaluate each best model on the test set after hyperparameter tuning
for model_name, best_model in best_models.items():
    print(f"\nEvaluating {model_name} after tuning...")
    metrics = evaluate_model(best_model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'After Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} after tuning:\n{metrics['Confusion Matrix']}")

# Display the results
print(results)


############################################################ VOCAL ############################################################

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_audio, y, test_size=0.2, random_state=42)

# Create a custom scorer for F1-Macro
f1_macro_scorer = make_scorer(f1_score, average='macro')

# Define parameter grids for each model
param_grids = {
    'xgboost': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9],
        'colsample_bytree': [0.7, 0.8, 0.9]
    },
    'random_forest': {
        'n_estimators': [100, 200],
        'max_depth': [None, 10, 20],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'decision_tree': {
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'logistic_regression': {
        'C': [0.01, 0.1, 1, 10, 100],
        'penalty': ['l1', 'l2'],
        'solver': ['liblinear']
    },
    'gradient_boosting': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9]
    },
    'svm': {
        'C': [0.1, 1, 10],
        'gamma': ['scale', 'auto'],
        'kernel': ['rbf', 'linear']
    }
}

# Initialize models
models = {
    'xgboost': xgb.XGBClassifier(objective='binary:logistic', use_label_encoder=False, tree_method='gpu_hist', predictor='gpu_predictor'),
    'random_forest': RandomForestClassifier(),
    'decision_tree': DecisionTreeClassifier(),
    'logistic_regression': LogisticRegression(),
    'gradient_boosting': GradientBoostingClassifier(),
}

# Dictionary to store the best models
best_models = {}

# DataFrame to store the evaluation results
results = pd.DataFrame(columns=['Model', 'Stage', 'Accuracy', 'F1 Score', 'AUC/ROC', 'Precision', 'Recall'])

def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else model.decision_function(X_test)
    f1 = f1_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    auc_roc = roc_auc_score(y_test, y_prob)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)
    return {'Accuracy': accuracy, 'F1 Score': f1, 'AUC/ROC': auc_roc, 'Precision': precision, 'Recall': recall, 'Confusion Matrix': cm}



# Evaluate each model before hyperparameter tuning
for model_name, model in models.items():
    print(f"\nEvaluating {model_name} before tuning...")
    model.fit(X_train, y_train)
    metrics = evaluate_model(model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'Before Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} before tuning:\n{metrics['Confusion Matrix']}")

# Perform grid search for each model
for model_name, model in models.items():
    print(f"\nTraining {model_name} with GridSearchCV...")
    grid_search = GridSearchCV(estimator=model, param_grid=param_grids[model_name], scoring=f1_macro_scorer, cv=5, n_jobs=-1, verbose=2)
    grid_search.fit(X_train, y_train)
    best_models[model_name] = grid_search.best_estimator_
    print(f"Best parameters for {model_name}: {grid_search.best_params_}")

# Evaluate each best model on the test set after hyperparameter tuning
for model_name, best_model in best_models.items():
    print(f"\nEvaluating {model_name} after tuning...")
    metrics = evaluate_model(best_model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'After Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} after tuning:\n{metrics['Confusion Matrix']}")

# Display the results
print(results)



############################################################ VERBAL ############################################################

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_verbal, y, test_size=0.2, random_state=50)

# Create a custom scorer for F1-Macro
f1_macro_scorer = make_scorer(f1_score, average='macro')

# Define parameter grids for each model
param_grids = {
    'xgboost': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9],
        'colsample_bytree': [0.7, 0.8, 0.9]
    },
    'random_forest': {
        'n_estimators': [100, 200],
        'max_depth': [None, 10, 20],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'decision_tree': {
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'logistic_regression': {
        'C': [0.01, 0.1, 1, 10, 100],
        'penalty': ['l1', 'l2'],
        'solver': ['liblinear']
    },
    'gradient_boosting': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9]
    },
    'svm': {
        'C': [0.1, 1, 10],
        'gamma': ['scale', 'auto'],
        'kernel': ['rbf', 'linear']
    }
}

# Initialize models
models = {
    'xgboost': xgb.XGBClassifier(objective='binary:logistic', use_label_encoder=False, tree_method='gpu_hist', predictor='gpu_predictor'),
    'random_forest': RandomForestClassifier(),
    'decision_tree': DecisionTreeClassifier(),
    'logistic_regression': LogisticRegression(),
    'gradient_boosting': GradientBoostingClassifier(),
}

# Dictionary to store the best models
best_models = {}

# DataFrame to store the evaluation results
results = pd.DataFrame(columns=['Model', 'Stage', 'Accuracy', 'F1 Score', 'AUC/ROC', 'Precision', 'Recall'])

def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else model.decision_function(X_test)
    f1 = f1_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    auc_roc = roc_auc_score(y_test, y_prob)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)
    return {'Accuracy': accuracy, 'F1 Score': f1, 'AUC/ROC': auc_roc, 'Precision': precision, 'Recall': recall, 'Confusion Matrix': cm}



# Evaluate each model before hyperparameter tuning
for model_name, model in models.items():
    print(f"\nEvaluating {model_name} before tuning...")
    model.fit(X_train, y_train)
    metrics = evaluate_model(model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'Before Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} before tuning:\n{metrics['Confusion Matrix']}")

# Perform grid search for each model
for model_name, model in models.items():
    print(f"\nTraining {model_name} with GridSearchCV...")
    grid_search = GridSearchCV(estimator=model, param_grid=param_grids[model_name], scoring=f1_macro_scorer, cv=5, n_jobs=-1, verbose=2)
    grid_search.fit(X_train, y_train)
    best_models[model_name] = grid_search.best_estimator_
    print(f"Best parameters for {model_name}: {grid_search.best_params_}")

# Evaluate each best model on the test set after hyperparameter tuning
for model_name, best_model in best_models.items():
    print(f"\nEvaluating {model_name} after tuning...")
    metrics = evaluate_model(best_model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'After Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} after tuning:\n{metrics['Confusion Matrix']}")

# Display the results
print(results)





### Combined

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
from sklearn.metrics import make_scorer, f1_score, classification_report, accuracy_score, roc_auc_score, confusion_matrix, roc_curve, precision_score, recall_score
from imblearn.over_sampling import SMOTE
import random

from collections import Counter
from sklearn.datasets import make_classification
from matplotlib import pyplot
from numpy import where


# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_selected, y, test_size=0.3, random_state=42, stratify=y)

# Define parameter grids for each model
param_grids = {
    'xgboost': {
        'n_estimators': [50, 150, 250],
        'max_depth': [3, 5, 7, 9],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9],
        'colsample_bytree': [0.5, 0.7, 0.9]
    },
    'random_forest': {
        'n_estimators': [25, 50, 100, 200, 300],
        'max_depth': [None, 2, 5, 10, 20],
        'min_samples_split': [2, 4, 6, 8],
        'min_samples_leaf': [1, 2, 4]
    },
    'logistic_regression': {
        'C': [0.01, 0.1, 1, 10, 100, 200, 300, 400, 500],
        'penalty': ['l1', 'l2'],
        'solver': ['liblinear'],
        'class_weight': ['balanced']
    },
    'gradient_boosting': {
        'n_estimators': [50, 100, 200, 300],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.005, 0.01, 0.1, 0.3],
        'subsample': [0.5, 0.7, 0.8, 0.9]
    },
    'decision_tree': {
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    }
}

# Initialize models
models = {
    'xgboost': xgb.XGBClassifier(objective='binary:logistic', use_label_encoder=False, tree_method='gpu_hist', predictor='gpu_predictor'),
    'random_forest': RandomForestClassifier(),
    'logistic_regression': LogisticRegression(),
    'gradient_boosting': GradientBoostingClassifier(),
    'decision_tree': DecisionTreeClassifier(),
}

# Dictionary to store the best models
best_models = {}

# DataFrame to store the evaluation results
results = pd.DataFrame(columns=['Model', 'Stage', 'Accuracy', 'F1 Score', 'AUC/ROC', 'Precision', 'Recall'])

def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else model.decision_function(X_test)
    f1 = f1_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    auc_roc = roc_auc_score(y_test, y_prob)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)
    return {'Accuracy': accuracy, 'F1 Score': f1, 'AUC/ROC': auc_roc, 'Precision': precision, 'Recall': recall, 'Confusion Matrix': cm}

# Evaluate each model before hyperparameter tuning
for model_name, model in models.items():
    print(f"\nEvaluating {model_name} before tuning...")
    model.fit(X_train, y_train)
    metrics = evaluate_model(model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'Before Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} before tuning:\n{metrics['Confusion Matrix']}")

cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)

# Perform grid search for each model
for model_name, model in models.items():
    print(f"\nTraining {model_name} with GridSearchCV...")
    grid_search = GridSearchCV(estimator=model, param_grid=param_grids[model_name], scoring="accuracy", cv=cv, n_jobs=-1, verbose=2)
    grid_search.fit(X_train, y_train)
    best_models[model_name] = grid_search.best_estimator_
    print(f"Best parameters for {model_name}: {grid_search.best_params_}")

# Evaluate each best model on the test set after hyperparameter tuning
for model_name, best_model in best_models.items():
    print(f"\nEvaluating {model_name} after tuning...")
    metrics = evaluate_model(best_model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'After Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} after tuning:\n{metrics['Confusion Matrix']}")

# Display the final results
print(results)


### Visual

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
from sklearn.metrics import make_scorer, f1_score, classification_report, accuracy_score, roc_auc_score, confusion_matrix, roc_curve, precision_score, recall_score
from sklearn.svm import SVC
import random

np.random.seed(42)
random.seed(42)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_visual, y, test_size=0.2, random_state=42)

# Create a custom scorer for F1-Macro
f1_macro_scorer = make_scorer(f1_score, average='macro')

# Define parameter grids for each model
param_grids = {
    'xgboost': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9],
        'colsample_bytree': [0.7, 0.8, 0.9]
    },
    'random_forest': {
        'n_estimators': [100, 200],
        'max_depth': [None, 10, 20],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'decision_tree': {
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'logistic_regression': {
        'C': [0.01, 0.1, 1, 10, 100],
        'penalty': ['l1', 'l2'],
        'solver': ['liblinear']
    },
    'gradient_boosting': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9]
    },
    'svm': {
        'C': [0.1, 1, 10],
        'gamma': ['scale', 'auto'],
        'kernel': ['rbf', 'linear']
    }
}

# Initialize models
models = {
    'xgboost': xgb.XGBClassifier(objective='binary:logistic', use_label_encoder=False, tree_method='gpu_hist', predictor='gpu_predictor'),
    'random_forest': RandomForestClassifier(),
    'decision_tree': DecisionTreeClassifier(),
    'logistic_regression': LogisticRegression(),
    'gradient_boosting': GradientBoostingClassifier(),
}

# Dictionary to store the best models
best_models = {}

# DataFrame to store the evaluation results
results = pd.DataFrame(columns=['Model', 'Stage', 'Accuracy', 'F1 Score', 'AUC/ROC', 'Precision', 'Recall'])

def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else model.decision_function(X_test)
    f1 = f1_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    auc_roc = roc_auc_score(y_test, y_prob)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)
    return {'Accuracy': accuracy, 'F1 Score': f1, 'AUC/ROC': auc_roc, 'Precision': precision, 'Recall': recall, 'Confusion Matrix': cm}



# Evaluate each model before hyperparameter tuning
for model_name, model in models.items():
    print(f"\nEvaluating {model_name} before tuning...")
    model.fit(X_train, y_train)
    metrics = evaluate_model(model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'Before Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} before tuning:\n{metrics['Confusion Matrix']}")

# Perform grid search for each model
for model_name, model in models.items():
    print(f"\nTraining {model_name} with GridSearchCV...")
    grid_search = GridSearchCV(estimator=model, param_grid=param_grids[model_name], scoring=f1_macro_scorer, cv=5, n_jobs=-1, verbose=2)
    grid_search.fit(X_train, y_train)
    best_models[model_name] = grid_search.best_estimator_
    print(f"Best parameters for {model_name}: {grid_search.best_params_}")

# Evaluate each best model on the test set after hyperparameter tuning
for model_name, best_model in best_models.items():
    print(f"\nEvaluating {model_name} after tuning...")
    metrics = evaluate_model(best_model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'After Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} after tuning:\n{metrics['Confusion Matrix']}")

# Display the results
print(results)



### Vocal

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
from sklearn.metrics import make_scorer, f1_score, classification_report, accuracy_score, roc_auc_score, confusion_matrix, roc_curve, precision_score, recall_score
from sklearn.svm import SVC
import random

np.random.seed(42)
random.seed(42)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_audio, y, test_size=0.2, random_state=42)

# Create a custom scorer for F1-Macro
f1_macro_scorer = make_scorer(f1_score, average='macro')

# Define parameter grids for each model
param_grids = {
    'xgboost': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9],
        'colsample_bytree': [0.7, 0.8, 0.9]
    },
    'random_forest': {
        'n_estimators': [100, 200],
        'max_depth': [None, 10, 20],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'decision_tree': {
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'logistic_regression': {
        'C': [0.01, 0.1, 1, 10, 100],
        'penalty': ['l1', 'l2'],
        'solver': ['liblinear']
    },
    'gradient_boosting': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9]
    },
    'svm': {
        'C': [0.1, 1, 10],
        'gamma': ['scale', 'auto'],
        'kernel': ['rbf', 'linear']
    }
}

# Initialize models
models = {
    'xgboost': xgb.XGBClassifier(objective='binary:logistic', use_label_encoder=False, tree_method='gpu_hist', predictor='gpu_predictor'),
    'random_forest': RandomForestClassifier(),
    'decision_tree': DecisionTreeClassifier(),
    'logistic_regression': LogisticRegression(),
    'gradient_boosting': GradientBoostingClassifier(),
}

# Dictionary to store the best models
best_models = {}

# DataFrame to store the evaluation results
results = pd.DataFrame(columns=['Model', 'Stage', 'Accuracy', 'F1 Score', 'AUC/ROC', 'Precision', 'Recall'])

def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else model.decision_function(X_test)
    f1 = f1_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    auc_roc = roc_auc_score(y_test, y_prob)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)
    return {'Accuracy': accuracy, 'F1 Score': f1, 'AUC/ROC': auc_roc, 'Precision': precision, 'Recall': recall, 'Confusion Matrix': cm}



# Evaluate each model before hyperparameter tuning
for model_name, model in models.items():
    print(f"\nEvaluating {model_name} before tuning...")
    model.fit(X_train, y_train)
    metrics = evaluate_model(model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'Before Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} before tuning:\n{metrics['Confusion Matrix']}")

# Perform grid search for each model
for model_name, model in models.items():
    print(f"\nTraining {model_name} with GridSearchCV...")
    grid_search = GridSearchCV(estimator=model, param_grid=param_grids[model_name], scoring=f1_macro_scorer, cv=5, n_jobs=-1, verbose=2)
    grid_search.fit(X_train, y_train)
    best_models[model_name] = grid_search.best_estimator_
    print(f"Best parameters for {model_name}: {grid_search.best_params_}")

# Evaluate each best model on the test set after hyperparameter tuning
for model_name, best_model in best_models.items():
    print(f"\nEvaluating {model_name} after tuning...")
    metrics = evaluate_model(best_model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'After Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} after tuning:\n{metrics['Confusion Matrix']}")

# Display the results
print(results)



### Verbal

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
from sklearn.metrics import make_scorer, f1_score, classification_report, accuracy_score, roc_auc_score, confusion_matrix, roc_curve, precision_score, recall_score
from sklearn.svm import SVC
import random

np.random.seed(50)
random.seed(50)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_verbal, y, test_size=0.2, random_state=50)

# Create a custom scorer for F1-Macro
f1_macro_scorer = make_scorer(f1_score, average='macro')

# Define parameter grids for each model
param_grids = {
    'xgboost': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9],
        'colsample_bytree': [0.7, 0.8, 0.9]
    },
    'random_forest': {
        'n_estimators': [100, 200],
        'max_depth': [None, 10, 20],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'decision_tree': {
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    },
    'logistic_regression': {
        'C': [0.01, 0.1, 1, 10, 100],
        'penalty': ['l1', 'l2'],
        'solver': ['liblinear']
    },
    'gradient_boosting': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.7, 0.8, 0.9]
    },
    'svm': {
        'C': [0.1, 1, 10],
        'gamma': ['scale', 'auto'],
        'kernel': ['rbf', 'linear']
    }
}

# Initialize models
models = {
    'xgboost': xgb.XGBClassifier(objective='binary:logistic', use_label_encoder=False, tree_method='gpu_hist', predictor='gpu_predictor'),
    'random_forest': RandomForestClassifier(),
    'decision_tree': DecisionTreeClassifier(),
    'logistic_regression': LogisticRegression(),
    'gradient_boosting': GradientBoostingClassifier(),
}

# Dictionary to store the best models
best_models = {}

# DataFrame to store the evaluation results
results = pd.DataFrame(columns=['Model', 'Stage', 'Accuracy', 'F1 Score', 'AUC/ROC', 'Precision', 'Recall'])

def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else model.decision_function(X_test)
    f1 = f1_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    auc_roc = roc_auc_score(y_test, y_prob)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)
    return {'Accuracy': accuracy, 'F1 Score': f1, 'AUC/ROC': auc_roc, 'Precision': precision, 'Recall': recall, 'Confusion Matrix': cm}



# Evaluate each model before hyperparameter tuning
for model_name, model in models.items():
    print(f"\nEvaluating {model_name} before tuning...")
    model.fit(X_train, y_train)
    metrics = evaluate_model(model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'Before Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} before tuning:\n{metrics['Confusion Matrix']}")

# Perform grid search for each model
for model_name, model in models.items():
    print(f"\nTraining {model_name} with GridSearchCV...")
    grid_search = GridSearchCV(estimator=model, param_grid=param_grids[model_name], scoring=f1_macro_scorer, cv=5, n_jobs=-1, verbose=2)
    grid_search.fit(X_train, y_train)
    best_models[model_name] = grid_search.best_estimator_
    print(f"Best parameters for {model_name}: {grid_search.best_params_}")

# Evaluate each best model on the test set after hyperparameter tuning
for model_name, best_model in best_models.items():
    print(f"\nEvaluating {model_name} after tuning...")
    metrics = evaluate_model(best_model, X_test, y_test)
    metrics.update({'Model': model_name, 'Stage': 'After Tuning'})
    results = pd.concat([results, pd.DataFrame([metrics])], ignore_index=True)
    print(f"Confusion Matrix for {model_name} after tuning:\n{metrics['Confusion Matrix']}")

# Display the results
print(results)



## Shap Values

In [None]:
import shap
import pandas as pd
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier

def plot_shap_summary(best_gb, X_train, X_test):
    # SHAP values for the best Gradient Boosting model
    explainer = shap.Explainer(best_gb, X_train)
    shap_values = explainer(X_test)

    # Plot summary plot
    shap.summary_plot(shap_values, X_test, max_display=X_train.shape[1])

# Example usage
data_path = 'path/to/your/data.csv'

# Load your data
data = pd.read_csv(data_path)

# Assuming 'target' is your dependent variable and others are independent variables
X = data.drop(columns=['target'])
y = data['target']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train a Gradient Boosting model (XGBoost in this case)
best_gb = XGBClassifier(use_label_encoder=False, eval_metric='logloss')
best_gb.fit(X_train, y_train)

# Plot SHAP summary
plot_shap_summary(best_gb, X_train, X_test)
