# Commalla Acoustic Feature 
REMEMBER TO TURN ON SAVING CSVs!

In [2]:
import os 
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy.io import wavfile
import parselmouth as pm
from parselmouth.praat import call

# Feature Extraction Functions
The path tree produces a list of tuples. The tuples are len = 3, and the survey files folders we need have structure (folder pathname, [], list of .wav filenames). To find the relevant folders, we're using two conditions: long folder name in 0th index and empty list in 1st index. Next, we figure out which emotion/function this folder is. For each file in the folder, create a praat.parselmouth Sound object and find the pitch statistics. 

Basic functions: https://www.fon.hum.uva.nl/praat/manual/Script_for_listing_F0_statistics.html
Shimmer, Jitter: https://osf.io/qe9k4

In [3]:

def pitch_stats(x_mono):
    x_pitch = x_mono.to_pitch()
    mean_pitch_row = call(x_pitch, "Get mean", 0, 0, "Hertz")
    min_pitch_row = call(x_pitch, "Get minimum", 0, 0, "Hertz", "Parabolic")
    max_pitch_row = call(x_pitch, "Get maximum", 0, 0, "Hertz", "Parabolic")
    stdev_pitch_row = call(x_pitch, "Get standard deviation", 0, 0, "Hertz")

    # start_pitch = call(x_pitch, "Get mean", 0, 0.05, "Hertz")
    # end_pitch = call(x_pitch, "Get mean", duration_row-0.05, duration_row, "Hertz")

    pitch_stats_row = {
        "mean_pitch":[mean_pitch_row],
        "pitch_range":[max_pitch_row-min_pitch_row],
        "min_pitch":[min_pitch_row],
        "max_pitch":[max_pitch_row],
        "stdev_pitch":[stdev_pitch_row],
        # "overall_pitch_change":[end_pitch-start_pitch]
    }
    return pitch_stats_row

def intensity_stats(x_mono):
    x_intensity = x_mono.to_intensity(minimum_pitch=64) # based on min(min_pitch)
    mean_intensity_row = x_intensity.get_average()
    min_intensity_row = x_intensity.get_minimum()
    max_intensity_row = x_intensity.get_maximum()

    intensity_stats_row = {
        "mean_intensity":[mean_intensity_row],
        "intensity_range:":[max_intensity_row-min_intensity_row], 
        "min_intensity":[min_intensity_row],
        "max_intensity":[max_intensity_row]
    }
    return intensity_stats_row

def formant_stats(x_mono):
    x_formant = x_mono.to_formant_burg()
    formant1_mean_row = call(x_formant, "Get mean", 1, 0, 0, "Hertz")
    formant2_mean_row = call(x_formant, "Get mean", 2, 0, 0, "Hertz")
    formant3_mean_row = call(x_formant, "Get mean", 3, 0, 0, "Hertz")
    formant1_stdev_row = call(x_formant, "Get standard deviation", 1, 0, 0, "Hertz")
    formant2_stdev_row = call(x_formant, "Get standard deviation", 2, 0, 0, "Hertz")
    formant3_stdev_row = call(x_formant, "Get standard deviation", 3, 0, 0, "Hertz")

    formant_stats_row = {
        "formant1_mean":[formant1_mean_row],
        "formant2_mean":[formant2_mean_row],
        "formant3_mean":[formant3_mean_row],
        "formant1_stdev":[formant1_stdev_row],
        "formant2_stdev":[formant2_stdev_row],
        "formant3_stdev":[formant3_stdev_row]
    }
    return formant_stats_row

def jitter_stats(x_mono):
    x_pointprocess = call(x_mono,"To PointProcess (periodic, cc)...", 75, 600) # CHECK VAL
    localJitter = call(x_pointprocess, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3)
    localabsoluteJitter = call(x_pointprocess, "Get jitter (local, absolute)", 0, 0, 0.0001, 0.02, 1.3)
    rapJitter = call(x_pointprocess, "Get jitter (rap)", 0, 0, 0.0001, 0.02, 1.3)
    ppq5Jitter = call(x_pointprocess, "Get jitter (ppq5)", 0, 0, 0.0001, 0.02, 1.3)
    ddpJitter = call(x_pointprocess, "Get jitter (ddp)", 0, 0, 0.0001, 0.02, 1.3)

    jitter_stats_row = {
        "local_jitter":[localJitter],
        "local_abs_jitter":[localabsoluteJitter],
        "rap_jitter":[rapJitter],
        "ppq5_jitter":[ppq5Jitter],
        "ddp_jitter":[ddpJitter]
    }
    return jitter_stats_row

def shimmer_stats(x_mono):
    x_pointprocess = call(x_mono,"To PointProcess (periodic, cc)...", 75, 600) # CHECK VAL
    localShimmer =  call([x_mono, x_pointprocess], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6)
    localdbShimmer = call([x_mono, x_pointprocess], "Get shimmer (local_dB)", 0, 0, 0.0001, 0.02, 1.3, 1.6)
    apq3Shimmer = call([x_mono, x_pointprocess], "Get shimmer (apq3)", 0, 0, 0.0001, 0.02, 1.3, 1.6)
    aqpq5Shimmer = call([x_mono, x_pointprocess], "Get shimmer (apq5)", 0, 0, 0.0001, 0.02, 1.3, 1.6)
    apq11Shimmer =  call([x_mono, x_pointprocess], "Get shimmer (apq11)", 0, 0, 0.0001, 0.02, 1.3, 1.6)
    ddaShimmer = call([x_mono, x_pointprocess], "Get shimmer (dda)", 0, 0, 0.0001, 0.02, 1.3, 1.6)

    shimmer_stats_row = {
        "local_shimmer":[localShimmer],
        "local_db_shimer":[localdbShimmer],
        "apq3_shimmer":[apq3Shimmer],
        "apq5_shimmer":[aqpq5Shimmer],
        "apq11_shimmer":[apq11Shimmer],
        "dda_shimmer":[ddaShimmer]
    }
    return shimmer_stats_row

def harmonicity_stats(x_mono):
    x_harmonicity = call(x_mono, "To Harmonicity (cc)", 0.01, 75, 0.1, 1.0) # CHECK VAL
    return {"hnr":[call(x_harmonicity, "Get mean", 0, 0)]}

# SAUTER FEATURE SET 
def pitch_stats_2(x_mono):
    x_pitch = x_mono.to_pitch()
    mean_pitch_row = call(x_pitch, "Get mean", 0, 0, "Hertz")
    min_pitch_row = call(x_pitch, "Get minimum", 0, 0, "Hertz", "Parabolic")
    max_pitch_row = call(x_pitch, "Get maximum", 0, 0, "Hertz", "Parabolic")
    stdev_pitch_row = call(x_pitch, "Get standard deviation", 0, 0, "Hertz")

    pitch_stats_row = {
        "mean_pitch":[mean_pitch_row],
        "min_pitch":[min_pitch_row],
        "max_pitch":[max_pitch_row],
        "stdev_pitch":[stdev_pitch_row],
    }
    return pitch_stats_row

def intensity_stats_2(x_mono):
    x_intensity = x_mono.to_intensity(minimum_pitch=64) # based on min(min_pitch)
    mean_intensity_row = x_intensity.get_average()
    sd_intensity_row = call(x_intensity, "Get standard deviation", 0.0, 0.0)

    intensity_stats_row = {
        "mean_intensity":[mean_intensity_row],
        "sd_intensity":[sd_intensity_row]
    }
    return intensity_stats_row

def spectrum_stats(x_mono):
    x_spectrum = x_mono.to_spectrum()
    x_cog = call(x_spectrum, "Get centre of gravity", 2.0) # 2.0 = power
    x_sd = call(x_spectrum, "Get standard deviation", 2.0)
    return {"spectral_cog":[x_cog], 
            "spectral_sd":[x_sd]}

def amplitude_stats(x_mono):
    x_amp = call(x_mono, "Get root-mean-square", 0.0, 0.0)
    return {"amp_rms":[x_amp]}


# Sauter Feature Set

In [4]:

path_tree = []
for node in os.walk("/Users/simonrdkn/Dropbox (MIT)/commalla survey audio"):
    path_tree.append(node)

audio_stats = pd.DataFrame({
    "full_pathname":[],
    "filename":[],
    "recanvo_id":[],
    "function":[],
    "duration":[],
    "mean_pitch":[],
    "min_pitch":[],
    "max_pitch":[],
    "stdev_pitch":[],
    "mean_intensity":[],
    "spectral_cog":[],
    "spectral_sd":[],
    "amp_rms":[],
})
print(audio_stats.columns)

Index(['full_pathname', 'filename', 'recanvo_id', 'function', 'duration',
       'mean_pitch', 'min_pitch', 'max_pitch', 'stdev_pitch', 'mean_intensity',
       'spectral_cog', 'spectral_sd', 'amp_rms'],
      dtype='object')


In [5]:
for folder_tup in path_tree:
    # structure = (pathname of folder, empty [], .wav filenames)
    foldername = folder_tup[0]
    if len(foldername)>67 and len(folder_tup[1])==0:
        for possible_fx in ['delighted', 'frustrated', "request", "self talk", "dysregulated"]:
            if possible_fx in foldername:
                fx_row = possible_fx # other words are formatted identically
                
        for filename in folder_tup[2]:
            if fx_row == "self talk": # making it match the qualtrics_key format
                fx_row = "selftalk"

            full_path_row = "/".join([foldername,filename])
            x = pm.Sound(full_path_row)
            x_mono_row = x.convert_to_mono()

            new_row = {
                "full_pathname":[full_path_row],
                "filename":[filename],
                "recanvo_id":[foldername[53:56]],
                "function":[fx_row],
                "duration":[call(x_mono_row, "Get total duration")],
            }
            
            for audio_stat_fx in [pitch_stats_2, intensity_stats_2, spectrum_stats, amplitude_stats]:
                new_row.update(audio_stat_fx(x_mono_row))

            audio_stats = pd.concat([audio_stats, pd.DataFrame(new_row)])


In [9]:
# Adding arousal and valence columns back in
av_stats_melt = pd.read_csv("/Users/simonrdkn/Documents/commalla_acoustic_analysis/av_stats_melt.csv")
arousal_unmelt = av_stats_melt.groupby(by="filename")["mean_arousal"].mean()
valence_unmelt = av_stats_melt.groupby(by="filename")["mean_valence"].mean()

sauter_feature_set = audio_stats # 
sauter_feature_set = pd.merge(sauter_feature_set, arousal_unmelt, how="left", on="filename")
sauter_feature_set = pd.merge(sauter_feature_set, valence_unmelt, how="left", on="filename")

mean_arousal = sauter_feature_set.pop("mean_arousal")
mean_valence = sauter_feature_set.pop("mean_valence")

sauter_feature_set.insert(4, 'mean_arousal', mean_arousal)
sauter_feature_set.insert(4, 'mean_valence', mean_valence)

# sauter_feature_set.to_csv("/Users/simonrdkn/Documents/commalla_acoustic_analysis/feature sets/sauter_feature_set.csv")

# Original Feature Set

In [None]:
path_tree = []
for node in os.walk("/Users/simonrdkn/Dropbox (MIT)/commalla survey audio"):
    path_tree.append(node)
# print(len("/Users/simonrdkn/Dropbox (MIT)/commalla survey audio/p11/p11 survey")) # used this len to know when we're in the right folder type 
# print(len("/Users/simonrdkn/Dropbox (MIT)/commalla survey audio/")) # used this len to get recanvo id

audio_stats = pd.DataFrame({
    "full_pathname":[],
    "filename":[],
    "recanvo_id":[],
    "function":[],
    "duration":[],
    "mean_pitch":[],
    "pitch_range": [],
    "min_pitch":[],
    "max_pitch":[],
    "stdev_pitch":[],
    # "overall_pitch_change":[],
    "mean_intensity":[],
    "intensity_range": [], 
    "min_intensity":[],
    "max_intensity":[],
    "formant1_mean":[],
    "formant2_mean":[],
    "formant3_mean":[],
    "formant1_stdev":[],
    "formant2_stdev":[],
    "formant3_stdev":[],
    "local_jitter":[],
    "local_abs_jitter":[],
    "rap_jitter":[],
    "ppq5_jitter":[],
    "ddp_jitter":[],
    "local_shimmer":[],
    "local_db_shimer":[],
    "apq3_shimmer":[],
    "apq5_shimmer":[],
    "apq11_shimmer":[],
    "dda_shimmer":[],
    "hnr":[],
})
print(audio_stats.columns)

In [None]:
for folder_tup in path_tree:
    # structure = (pathname of folder, empty [], .wav filenames)
    foldername = folder_tup[0]
    if len(foldername)>67 and len(folder_tup[1])==0:
        for possible_fx in ['delighted', 'frustrated', "request", "self talk", "dysregulated"]:
            if possible_fx in foldername:
                fx_row = possible_fx # other words are formatted identically
                
        for filename in folder_tup[2]:
            if fx_row == "self talk": # making it match the qualtrics_key format
                fx_row = "selftalk"

            full_path_row = "/".join([foldername,filename])
            x = pm.Sound(full_path_row)
            x_mono_row = x.convert_to_mono()

            new_row = {
                "full_pathname":[full_path_row],
                "filename":[filename],
                "recanvo_id":[foldername[53:56]],
                "function":[fx_row],
                "duration":[call(x_mono_row, "Get total duration")],
            }
            
            for audio_stat_fx in [pitch_stats, intensity_stats, formant_stats, jitter_stats, shimmer_stats, harmonicity_stats]:
                new_row.update(audio_stat_fx(x_mono_row))

            audio_stats = pd.concat([audio_stats, pd.DataFrame(new_row)])

# audio_stats.to_csv('/Users/simonrdkn/Documents/commalla_acoustic_analysis/commalla_audio_stats.csv')


# Attempted CPPS Code

this did not work

In [3]:
# TAKES 2 HOURS TO RUN 
cpps_df = pd.DataFrame({
    "full_pathname":[],
    "cpps":[]})

for folder_tup in path_tree:
    # structure = (pathname of folder, empty [], .wav filenames)
    foldername = folder_tup[0]
    if len(foldername)>67 and len(folder_tup[1])==0:
        for possible_fx in ['delighted', 'frustrated', "request", "self talk", "dysregulated"]:
            if possible_fx in foldername:
                fx_row = possible_fx # other words are formatted identically
                
        for filename in folder_tup[2]:
            if fx_row == "self talk": # making it match the qualtrics_key format
                fx_row = "selftalk"

            full_path_row = "/".join([foldername,filename])
            x = pm.Sound(full_path_row)
            x_mono_row = x.convert_to_mono()
            x_pc_row = call(x_mono_row, "To PowerCepstrogram", 60.0, 0.002, 5000, 50)
            cpps_row = call(x_pc_row, "Get CPPS...", True, 0.02, 0.0005, 60, 330, 0.05, "parabolic", 0.001, 0.05, "exponential decay", "robust slow")

            new_row = pd.DataFrame({"full_pathname": [full_path_row], "cpps": [cpps_row]})
            cpps_df = pd.concat([cpps_df, new_row])

audio_stats.to_csv('/Users/mayaradhakrishnan/Documents/commalla_acoustic_analysis/cpps.csv') #CHANGE


In [None]:
# Merging the CPPS df with av_stats_by_filename
av_stats_by_filename_v3 = pd.read_csv("/Users/simonrdkn/Documents/commalla_acoustic_analysis/old files/av_stats_by_filename_v3.csv")
av_stats_by_filename = pd.merge(av_stats_by_filename_v3, cpps_df, how="outer", on=["full_pathname"])
av_stats_by_filename = av_stats_by_filename.dropna(subset=["filename"]) # ~15 files with CPPS data that aren't in av_stats_by_filename

# audio_stats.to_csv('/Users/simonrdkn/Documents/commalla_acoustic_analysis/av_stats_by_filename.csv')

# print(av_stats_by_filename_v3.shape, cpps_df.shape, av_stats_by_filename.shape)