In [1]:
# The paths of the signal and the data set.
AUDIO_FILE = "/home/yaron/Office/Home_Excercise/Tirza/data/SBC001.wav"
DATA_SET = "/home/yaron/Office/Home_Excercise/Tirza/data/SBC001_labels.csv"

In [2]:
# Aplication parameters.
import torch
import pandas as pd
import numpy as np
from scipy.io import wavfile
import math
import torchaudio
import torchaudio.functional as F
import torchaudio.transforms as T
import time

ADDITIVE_INTENSITY = 96
ID_FIELD = "ID"
START_TIME_FIELD = "xmin"
END_TIME_FIELD = "xmax"
LABEL_FIELD = "label"
PREV_DIST_FIELD = "prev_dist"
NEXT_DIST_FIELD = "next_dist"
DURATION_FIELD = "duration"
INTENSITY_FIELD = "intensity"
START_PITCH_FIELD = "start_pitch"
MIDDLE_PITCH_FIELD = "middle_pitch"
END_PITCH_FIELD = "end_pitch"
NAIVE_PREDICTION = "naive_prediction"
Y_FIELD = "y"

LOWEST_FREQUENCY = 60
HIGHEST_FREQUENCY = 300
MIN_SIGNAL_LENGTH = 0.2
NAIVE_GAP_THRESHOLD = 0.1

In [3]:
# sci-kit objects
from sklearn import metrics
from sklearn.tree import DecisionTreeClassifier

In [4]:
# Data preparation functions

# Return the data set as a data frame along with columns
# that are used for the training:
# (1) Word duration
# (2) Distance from previous word
# (3) Distance to next word
# (4) Intensity (loudness)
# (5) Average pitch of the first third of a word
# (6) Average pitch of the middle third of a word
# (7) Average pitch of the last third of a word
def generate_train_set(raw_set_file: str, audio_file: str) -> pd.DataFrame:
    start_time = time.time()
    df = pd.read_csv(raw_set_file)
    fs, audio_signal = wavfile.read(audio_file)
    
    df["prev_prototype_id"] = df["prototype_id"].shift().fillna(-1)
    df["start_segment"] = df["prev_prototype_id"] != df["prototype_id"]

    df[DURATION_FIELD] = df[END_TIME_FIELD] - df[START_TIME_FIELD]
    df[PREV_DIST_FIELD] = df[START_TIME_FIELD] - df[END_TIME_FIELD].shift().fillna(0)
    df[NEXT_DIST_FIELD] = df[START_TIME_FIELD].shift(periods=-1).fillna(df[END_TIME_FIELD].max()) - df[END_TIME_FIELD]
    df[INTENSITY_FIELD] = df.apply(lambda r: compute_intensity(r[START_TIME_FIELD], r[END_TIME_FIELD], fs, audio_signal), axis=1)

    audio_signal, fs = torchaudio.load(audio_file)
    pitch_df = df.apply(lambda r: compute_pitch(r[START_TIME_FIELD], r[END_TIME_FIELD], fs, audio_signal), axis=1)
    pitch_df = pd.DataFrame({"pitches": pitch_df.values})
    df[START_PITCH_FIELD] = pitch_df.pitches.apply(lambda r: r[START_PITCH_FIELD])
    df[MIDDLE_PITCH_FIELD] = pitch_df.pitches.apply(lambda r: r[MIDDLE_PITCH_FIELD])
    df[END_PITCH_FIELD] = pitch_df.pitches.apply(lambda r: r[END_PITCH_FIELD])
    
    df[Y_FIELD] = df[LABEL_FIELD].apply(lambda x: 1 if x else 0)
    df[NAIVE_PREDICTION] = df[PREV_DIST_FIELD].apply(lambda x: True if x > NAIVE_GAP_THRESHOLD else False)

    print("generate train set run time: {:.2f}".format(time.time() - start_time))

    return df

def compute_intensity(start_time: float, end_time: float, fs: float, audio_signal: np.ndarray) -> float:
    sub_signal = audio_signal[int(start_time*fs):int(end_time*fs)]
    sub_signal = sub_signal / np.max(np.abs(sub_signal))
    sub_signal = sub_signal**2
    rms = 10 * math.log10(sub_signal.sum() / sub_signal.shape[0])
    return rms + ADDITIVE_INTENSITY

def compute_pitch(start_time: float, end_time: float, fs: int, audio_signal: torch.Tensor) -> dict:
    signal_length = end_time - start_time
    actual_end_time = (end_time if signal_length >= MIN_SIGNAL_LENGTH else (start_time + MIN_SIGNAL_LENGTH))
    pitch = F.detect_pitch_frequency(torch.unsqueeze(audio_signal[0,int(start_time*fs):int(actual_end_time*fs)], 0), fs, freq_low=LOWEST_FREQUENCY, freq_high=HIGHEST_FREQUENCY)
    num_items = pitch.shape[1]
    one_third = int(num_items/3 + 0.5)
    return {START_PITCH_FIELD:(pitch[0,:one_third].sum()/one_third).item(),
            MIDDLE_PITCH_FIELD:(pitch[0,one_third:2*one_third].sum()/one_third).item(),
            END_PITCH_FIELD:(pitch[0,2*one_third:].sum()/(num_items - 2*one_third)).item()}


In [5]:
# Get the data set.
# This may take take a couple of minutes
import time

start_time = time.time()
data_set_df = generate_train_set(DATA_SET, AUDIO_FILE)

print("Data preparation time: {:.2f}".format(time.time() - start_time))
print("A look at the data set:")
data_set_df.head(15)

generate train set run time: 73.63
Data preparation time: 73.64
A look at the data set:


Unnamed: 0,id,xmin,xmax,text,label,duration,prev_dist,next_dist,intensity,start_pitch,middle_pitch,end_pitch,y,naive_prediction
0,0,16.48,16.76,well,True,0.28,16.48,0.02,87.852139,115.287125,153.125,153.339157,1,True
1,1,16.78,16.89,we're,True,0.11,0.02,0.0,87.733373,156.38298,156.38298,156.38298,1,False
2,2,16.89,17.06,gonna,False,0.17,0.0,0.0,85.452439,63.395573,67.95488,71.129028,0,False
3,3,17.06,17.32,have,False,0.26,0.0,0.0,87.853923,91.875008,91.875008,91.875008,0,False
4,4,17.32,17.4,to,False,0.08,0.0,0.0,82.760282,116.666664,116.666664,116.666664,0,False
5,5,17.4,17.76,find,False,0.36,0.0,0.0,85.761319,120.938057,143.506226,173.519745,0,False
6,6,17.76,18.3,somewhere,False,0.54,0.0,0.03,87.937255,78.307564,173.376617,136.215363,0,False
7,7,18.33,18.53,to,True,0.2,0.03,0.0,85.861823,67.638039,70.325638,74.242424,1,False
8,8,18.53,18.8,get,False,0.27,0.0,0.89,87.135183,232.10527,232.10527,211.755768,0,False
9,9,19.69,20.23,something,True,0.54,0.89,1.51,83.652595,210.444901,97.670273,97.238335,1,True


In [14]:
# Split the data set into train & test
TEST_SET_SPLIT = 0.2
conds = [np.random.random() < TEST_SET_SPLIT for i in range(len(data_set_df))]
test_df = data_set_df[conds]
train_df = data_set_df[[not x for x in conds]]
print("#data set: " + str(len(data_set_df)))
print("#train set: " + str(len(train_df)))
print("#test set: " + str(len(test_df)))

#data set: 3047
#train set: 2434
#test set: 613


In [15]:
# Extract the features from each set
features_columns = [DURATION_FIELD, PREV_DIST_FIELD,
                    NEXT_DIST_FIELD, INTENSITY_FIELD,
                    START_PITCH_FIELD, MIDDLE_PITCH_FIELD,
                    END_PITCH_FIELD, Y_FIELD]

train_features = train_df[features_columns].values
test_features = test_df[features_columns].values

train_x = train_features[:,:-1]
train_y = train_features[:,-1]
test_x = test_features[:,:-1]
test_y = test_features[:,-1]

print("Train: x: " + str(train_x.shape) + ", y: " + str(train_y.shape))
print("Test: x: " + str(test_x.shape) + ", y: " + str(test_y.shape))

Train: x: (2434, 7), y: (2434,)
Test: x: (613, 7), y: (613,)


In [16]:
# Train a decision tree model
dtc = DecisionTreeClassifier(random_state=0, max_depth=4, min_samples_split=3)
dtc = dtc.fit(train_x, train_y)

In [17]:
# Evaluate & compare
print("Baseline method (gap from previous word):")
print(metrics.cohen_kappa_score(test_df.label, test_df[NAIVE_PREDICTION]))

print("\nFeatures method:")
predictions = dtc.predict(test_x)
print(metrics.cohen_kappa_score(test_y, predictions))

Baseline method (gap from previous word):
0.5076461840650821

Features method:
0.8920359293155296
