In [None]:
!pip install pylangacq pydub moviepy ffmpeg

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pylangacq
import re
import ast
from pydub import AudioSegment
from pydub.silence import detect_silence
from moviepy import VideoFileClip
import ffmpeg
import os
import tempfile
import sys
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
import scipy.stats as stats

In [None]:
def process_audio(audio_file, min_silence_len, silence_thresh):
    if audio_file.lower().endswith(".wav"):
        audio_segment = AudioSegment.from_file(audio_file, format="wav")
    elif audio_file.lower().endswith(".mp4"):
        sys.stdout = open(os.devnull, 'w')
        video = VideoFileClip(audio_file)
        sys.stdout = sys.__stdout__

        temp_dir = tempfile.gettempdir()
        if not os.access(temp_dir, os.W_OK):
            raise PermissionError(f"Write access is not allowed for temp directory: {temp_dir}")
        
        try:
            with tempfile.NamedTemporaryFile(suffix=".wav", dir=temp_dir, delete=False) as temp_audio_file:
                audio = video.audio
                audio.write_audiofile(temp_audio_file.name, codec='pcm_s16le')
                audio_segment = AudioSegment.from_file(temp_audio_file.name, format="wav")

        except Exception as e:
            raise OSError(f"Error processing audio: {str(e)}")
    else:
        raise ValueError("Use a file type that contains audio.")
    
    silences = detect_silence(
        audio_segment,
        min_silence_len=min_silence_len if min_silence_len is not None else 2000, # 2 seconds
        silence_thresh=silence_thresh if silence_thresh is not None else -45 # default silence threshold
    )
    
    return silences

In [None]:
# Experimenting with different parameters
min_silence_lens = [10] # list of integers
silence_threshes = [-55] # list of integers
f_df = pd.DataFrame()

for e, (min_sil_len, sil_thresh) in enumerate(zip(min_silence_lens, silence_threshes)):
    print(f"{e+1}/{len(min_silence_lens)}")
    silences = []
    for aud in tqdm(): # Paths to audio files
        silences += [process_audio(aud, min_silence_len=min_sil_len, silence_thresh=sil_thresh)]
    
    f_df[f"min_silence_len_{min_sil_len}_silence_thresh_{sil_thresh}"] = silences

In [None]:
# len of audio for just the patient
def compute_patient_speaking_time(transcripts):
    audio_lens = []
    for pt in transcripts.patient_id.unique():
        pt_df = transcripts.query(f"patient_id == '{pt}' and Speaker == 'Patient'")
        audio_sum = 0
        for idx in pt_df.index.tolist():
            audio_sum += pt_df.loc[idx]["T_end_ms"] - pt_df.loc[idx]["T_start_ms"]
        audio_lens.append(audio_sum)

    return audio_lens

In [None]:
# len of audio for full session
def compute_full_session_time(transcripts):
    audio_lens = []
    for pt in transcripts.patient_id.unique():
        audio_lens.append(transcripts.query(f"patient_id == '{pt}'")["T_end_ms"].max())
    
    return audio_lens

In [None]:
def calculate_metrics(y_pred_binary, y_true_binary):
    accuracy = accuracy_score(y_true_binary, y_pred_binary)
    precision = precision_score(y_true_binary, y_pred_binary, zero_division=0)
    recall = recall_score(y_true_binary, y_pred_binary, zero_division=0)
    f1 = f1_score(y_true_binary, y_pred_binary, zero_division=0)
    auc = roc_auc_score(y_true_binary, y_pred_binary)
    
    # Calculate False Positive Rate (FPR)
    tn, fp, fn, tp = confusion_matrix(y_true_binary, y_pred_binary).ravel()
    fpr = fp / (fp + tn) if (fp + tn) > 0 else 0

    return accuracy, precision, recall, f1, auc, fpr

In [None]:
def calculate_scores(transcript, silences, col):
    transcript['pydub_silences'] = 0
    transcripts_df = transcript.copy()
    patient_ids = transcripts_df.patient_id.unique()

    for e, p_id in enumerate(patient_ids):
        appended_silences = set()
        # Filter rows for the current patient
        temp_df = transcripts_df[transcripts_df['patient_id'] == p_id]
        s = ast.literal_eval(silences.iloc[e])
        
        for i in range(len(temp_df)):
            start_ms = temp_df['T_start_ms'].iloc[i]
            end_ms = temp_df['T_end_ms'].iloc[i]
            silence_count = 0
            # Count silences within or partially overlapping the time range
            for sil in s:
                silence_tuple = tuple(sil)
                if silence_tuple in appended_silences:
                    continue
                # Check for any overlap between silence and time range
                if sil[0] <= end_ms and sil[1] >= start_ms:
                    appended_silences.add(silence_tuple)
                    silence_count += 1
            
            transcripts_df.loc[temp_df.index[i], 'pydub_silences'] = silence_count

    # Convert to binary classification: silence > 0 -> 1, else 0
    transcripts_df["pydub_silences"] = (transcripts_df["pydub_silences"] > 0).astype(int)
    accuracy, precision, recall, f1, auc, fpr = calculate_metrics(transcripts_df["pydub_silences"], transcripts_df["Speech delays"])

    return pd.DataFrame({'Accuracy': [accuracy], 'Precision': [precision], 'Recall': [recall], 'F1': [f1], 'AUC': [auc], 'FPR': [fpr], 'Parameters': [col]})

In [None]:
# Create dataframe with features for different silence lengths
def min_silence_len_sets(df, sil_len_thresh_ms, min_silence_col):
    all_sil_df = pd.DataFrame()
    all_sil_df[min_silence_col] = df[min_silence_col]
    all_sil_df[f"min_silence_len_{sil_len_thresh_ms}_silence_thresh_-55"] = df[min_silence_col].apply(lambda x: 0)

    for i, sil_ts in enumerate(all_sil_df[min_silence_col]):
        sil_ts_list = ast.literal_eval(sil_ts)
        val_sils = []
        for start, end in sil_ts_list:
            if end - start >= sil_len_thresh_ms:
                val_sils.append([start, end])
        all_sil_df.loc[i, f"min_silence_len_{sil_len_thresh_ms}_silence_thresh_-55"] = str(val_sils)

    return all_sil_df[[f"min_silence_len_{sil_len_thresh_ms}_silence_thresh_-55"]]

In [None]:
# Get detected silences where detected silence overlaps with labeled speech delay
def process_silences(df, all_sil_df):
    silence_lengths = [10] + list(range(100, 10_100, 50))
    val_sil_df = pd.DataFrame()

    for l in silence_lengths:
        valid_silences = []

        for i, id in enumerate(df.patient_id.unique()):
            test_df = df[df["patient_id"] == id]
            if i < len(all_sil_df):
                test_silences = ast.literal_eval(all_sil_df[f"min_silence_len_{l}_silence_thresh_-55"].iloc[i])
            else:
                continue

            s_delay_df = test_df[test_df["Speaker"] == 'Patient']
            # Use a set to store unique indices of valid silences
            unique_val_indices = set()
            for t_start, t_end in zip(s_delay_df["T_start_ms"], s_delay_df["T_end_ms"]):
                for e, (s_start, s_end) in enumerate(test_silences):
                    if s_start <= t_end and s_end >= t_start:
                        unique_val_indices.add(e)

            # Convert unique indices to list of silence intervals
            valid_silence = [test_silences[i] for i in sorted(unique_val_indices)]
            valid_silences.append(valid_silence)

        val_sil_df[f"min_silence_len_{l}_silence_thresh_-55"] = valid_silences

    return val_sil_df

In [None]:
SPLIT_TYPE = 'test'

r_df, model_data = pd.DataFrame(), pd.DataFrame()
t_df0, t_df1 = pd.DataFrame(), pd.DataFrame()

all_norm_sums, norm_silence_counts, lng_short_rats = [], [], []
all_norm_sums_p, norm_silence_counts_p, lng_short_rats_p = [], [], []
corrs, p_values = [], []
silence_lengths = [10] + list(range(100, 10_100, 50))

for l in silence_lengths:
    df0 = labels.reset_index().query(f"split == {SPLIT_TYPE}").reset_index(drop=True)
    df0["silences"] = pt_silence_df[f"min_silence_len_{l}_silence_thresh_-55"]
    lst = [df0.silences.iloc[i] for i in range(len(df0))]

    sums = []
    for i in range(len(df0)):
        ss = 0
        for j in range(len(lst[i])):
            ss += lst[i][j][1] - lst[i][j][0]
        sums.append(ss)

    avg_silence_durations = []
    for i in range(len(lst)):
        if len(lst[i]) > 0:
            avg_duration = sum([interval[1] - interval[0] for interval in lst[i]]) / len(lst[i])
        else:
            avg_duration = 0
        avg_silence_durations.append(avg_duration)   

    norm_sums = [sums[i] / audio_lens[i] for i in range(len(sums))]
    r_df[f"norm_silence_duration_{l}"] = norm_sums

    t = 0.3
    lt = 0.5
    st = 0.1
    lng = [i if i > t else 0 for i in norm_sums]
    short = [i if i <= t else 0 for i in norm_sums]
    lng_short_rat = [lng[i] / (short[i] or 1) for i in range(len(lng))]
    r_df[f"lng_short_rat_{l}"] = lng_short_rat
    norm_silence_count = [len(lst[i]) / audio_lens[i] for i in range(len(lst))]
    r_df[f"norm_silence_count_{l}"] = norm_silence_count

    df0["norm_sums"] = norm_sums
    df0["norm_silence_count"] = norm_silence_count
    df0["lng_short_rat"] = lng_short_rat

    norm_sums_stat, norm_sums_p_value = stats.mannwhitneyu(
        df0[df0['AD_dx'] == 0]['norm_sums'], 
        df0[df0['AD_dx'] == 1]['norm_sums'], 
        alternative='two-sided'
    )

    # Mann-Whitney U test for norm_silence_count
    norm_silence_count_stat, norm_silence_count_p_value = stats.mannwhitneyu(
        df0[df0['AD_dx'] == 0]['norm_silence_count'], 
        df0[df0['AD_dx'] == 1]['norm_silence_count'], 
        alternative='two-sided'
    )

    # Mann-Whitney U test for lng_short_rat
    lng_short_rats_stat, lng_short_rats_p_value = stats.mannwhitneyu(
        df0[df0['AD_dx'] == 0]['lng_short_rat'], 
        df0[df0['AD_dx'] == 1]['lng_short_rat'], 
        alternative='two-sided'
    )

    # Independent t-test for norm_sums
    # norm_sums_stat, norm_sums_p_value = stats.ttest_ind(
    #     df0[df0['AD_dx'] == 0]['norm_sums'], 
    #     df0[df0['AD_dx'] == 1]['norm_sums'], 
    #     alternative='two-sided'
    # )

    # # Independent t-test for norm_silence_count
    # norm_silence_count_stat, norm_silence_count_p_value = stats.ttest_ind(
    #     df0[df0['AD_dx'] == 0]['norm_silence_count'], 
    #     df0[df0['AD_dx'] == 1]['norm_silence_count'], 
    #     alternative='two-sided'
    # )

    # # Independent t-test for lng_short_rat
    # lng_short_rats_stat, lng_short_rats_p_value = stats.ttest_ind(
    #     df0[df0['AD_dx'] == 0]['lng_short_rat'], 
    #     df0[df0['AD_dx'] == 1]['lng_short_rat'], 
    #     alternative='two-sided'
    # )

    t_df0[f"norm_sums_{l}_dx"] = df0[df0['AD_dx'] == 0]['norm_sums']
    t_df1[f"norm_sums_{l}_dx"] = df0[df0['AD_dx'] == 1]['norm_sums']
    t_df0[f"norm_silence_count_{l}_dx"] = df0[df0['AD_dx'] == 0]['norm_silence_count']
    t_df1[f"norm_silence_count_{l}_dx"] = df0[df0['AD_dx'] == 1]['norm_silence_count']
    t_df0[f"lng_short_rats_{l}_dx"] = df0[df0['AD_dx'] == 0]['lng_short_rat']
    t_df1[f"lng_short_rats_{l}_dx"] = df0[df0['AD_dx'] == 1]['lng_short_rat']

    model_data[f"norm_sums_{l}"] = df0['norm_sums']
    model_data[f"norm_silence_count_{l}"] = df0['norm_silence_count']
    model_data[f"lng_short_rats_{l}"] = df0['lng_short_rat']