In [110]:
import os
import sys
import time

import numpy as np
import pandas as pd



from sklearn import mixture, discriminant_analysis
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report


In [2]:
def get_Smean(f0_cntr):
    return np.mean(f0_cntr)

def get_Sdmean(f0_cntr):
    return np.mean(np.gradient(f0_cntr))

def get_Sstd(f0_cntr):
    return np.std(f0_cntr)

def get_Sdstd(f0_cntr):
    return np.std(np.gradient(f0_cntr))

def get_Srange(f0_cntr):
    return np.max(f0_cntr) - np.max(f0_cntr)

def get_Sdrange(f0_cntr):
    grad = np.gradient(f0_cntr)
    return np.max(grad) - np.min(grad)

def get_Smax(f0_cntr):
    return np.max(f0_cntr)

def get_Smin(f0_cntr):
    return np.min(f0_cntr)


def get_SQ25(f0_cntr):
    return np.quantile(f0_cntr, 0.25)

def get_SQ75(f0_cntr):
    return np.quantile(f0_cntr, 0.75)

In [None]:
feature_map = {
    "Smean" : get_Smean,
    "Sdmean" : get_Sdmean,
    "Sstd" : get_Sstd,
    "Sdstd" : get_Sdstd,
    "Srange" : get_Srange,
    "Sdrange" : get_Sdrange,
    "Smax" : get_Smax,
    "Smin" : get_Smin,
    "SQ25" : get_SQ25,
    "SQ75" : get_SQ75
}

def feat_ext(corpus, features, save=None):
    samples = list()
    for i in range(len(corpus)):
        sample = list()
        for j in features:
            sample.append(features[j](corpus[i]))
        samples.append(sample)

    samples = np.array(samples)
    if save is not None:
        with open(save, "wb") as f:
            pickle.dump(samples, f)

    return samples

class IFN(object):
    
    
    def __init__(self, ref_corpus, enable_neutral, features, max_iter=10, \
                 ss_iter=400, neutral_label="01", emo_label="00", analysis=True, \
                 analysis_test_size=0.3, norm_scheme="ifn", neu_threshold=0.7, \
                 spkr_file_threshold=0.2, switch_threshold=0.05, log_dir="./", \
                 ref_gmms=None, ldc=None):
        
        self.enable_neutral = enable_neutral
        self.ref_corpus = ref_corpus # should be a list of ref f0 contours
        self.ref_feat = None
        self.features = features
        self.neutral_label = neutral_label
        self.emo_label = emo_label
        
        self.X_train_raw = None
        self.y_train_raw = None
        
        # This flag will trigger treatment of training corpus as both 
        # training and validation. Fit method will calculate stats of 
        # validation set after every ss_iter and save the best model.
        # The results of every ss_iter will also be saved.
        self.analysis = analysis 
        
        # Logs to store all necessary results
        self.log_dir = log_dir
        
        self.analysis_test_size = analysis_test_size
        
        self.ref_gmms = list()
        if ref_gmms is not None:
            self.ref_gmms = ref_gmms
            
        
        
        if self.enable_neutral:
            
            def fit_ref(self):
                    
                X_ref = feat_ext(samples, self.features)
                
                self.ref_feat = X_ref.copy()
                
                y_ref = np.array([self.neutral_label] * len(samples))
                
                for i in range(len(self.features)):
                    gmm = GaussianMixture(n_components=2)
                    gmm.fit(X_ref[:, i].reshape(X_ref.shape[0], 1), y_ref)
                    
                    self.ref_gmms.append(gmm)
                
        self.ss_iter = ss_iter
        self.max_iter = max_iter
        self.balance = False
        
        self.norm_constants = dict()
        self.norm_scheme = norm_scheme
        self.neu_threshold = neu_threshold
        
        self.spkr_file_threshold = spkr_file_threshold
        self.switch_threshold = switch_threshold
        
        self.best_ldc = ldc
        self.ldc = ldc
        
    def fit(self, X_train_raw, y_train_raw, spkrs, save=None):
        
        # X_train_raw should be a list of pitch contours
        # y_train_raw should be np array of labels
        # spkrs should be an np array of speakers
        self.X_train_raw = X_train_raw
        self.y_train_raw = y_train_raw
        self.spkrs = spkrs
        

        num_neu_samples = len(y_train[y_train == self.neutral_label])
        num_emo_samples = len(y_train[y_train != self.neutral_label])
        
        diff = np.abs(num_emo_samples - num_neu_samples) / num_neu_samples
        
        if diff > 0.1:
            self.balance = True
        else:
            self.ss_iter = 1
        
        for i in range(self.ss_iter):
            
            X_train_raw_ss = self.X_train_raw
            y_train_raw_ss = self.y_train_raw
            spkrs_ss = self.spkrs
            

            
            if self.balance:
            
                # get indices of emotional samples in training set
                emo_idx = [i for i in range(len(y_train_raw_ss)) if y_train_raw_ss != self.neutral_label]

                # get indices of neutral samples in training set
                neu_idx = [i for i in range(len(y_train_raw_ss)) if y_train_raw_ss == self.neutral_label]
                
                neutral_samples_per_speaker = dict()
                
                for i in neu_idx:
                    if spkrs_ss[i] not in neutral_samples_per_speaker:
                        neutral_samples_per_speaker[spkrs_ss[i]] = 0
                        
                    neutral_samples_per_speaker += [spkrs_ss[i]]

                # sample population of emotional samples equal in size to neutral samples for each speaker
                emo_idx_sample = sum([np.random.choice([emo_idx[j] for i in np.unique(spkrs_ss) \
                                                    for j in emo_idx if spkrs_ss[j] == i], \
                                                       neutral_samples_per_speaker[i])], []).tolist()
                
                # make undersampled dataset
                X_train_raw_ss = [X_train_raw_ss[j] for j in emo_idx_sample + neu_idx]
                y_train_raw_ss = np.array([self.emo_label] * len(emo_idx_sample)\
                                       + [self.neutral_label] * len(neu_idx))
                spkrs_ss = np.array([spkrs_ss[i] for i in emo_idx_sample + neu_idx])
            

            
            if self.analysis:
                X_train_raw, X_test_raw, y_train_raw, y_test_raw, spkrs_train, spkrs_test = train_test_split(X_train_raw_ss,\
                                                                                                             y_train_raw_ss,\
                                                                                                             spkrs_ss,\
                                                                                                             test_size=0.3,\
                                                                                                             stratify=np.vstack((y_train_raw_ss, spkrs_ss)).T)
    
            
            self.norm_constants = {i: 1 for i in np.unique(spkrs_ss)}
            
            X_train_raw_cur = X_train_raw
            X_test_raw_cur = X_test_raw
            
            y_train_raw_cur = y_train_raw
            y_test_raw_cur = y_test_raw
            
            y_train_raw_prev = None
            y_test_raw_prev = None
            
            change = 0
            
            best_test_acc = -1
            self.ldc = None
            
            for i in range(self.max_iter):
                
                # Actual IFN iterations start now
                X_train_feat = feat_ext(X_train_raw_cur, self.features)
                X_test_feat = feat_ext(X_test_raw_cur, self.features)
                
                ldc = discriminant_analysis.LinearDiscriminantAnalysis()

                ldc.fit(X_train_feat, y_train_raw)
                
                probs_ldc = ldc.predict_proba(X_train_feat)
                
                spkrs_neu_pred = dict()
                
                for i in range(len(probs_ldc)):
                    if probs_ldc[i] > self.neu_threshold:
                        if spkrs_use[i] not in spkrs_neu_pred:
                            spkrs_neu_pred[i] = 0
                        spkrs_neu_pred[i] += 1
                
                y_train_raw_prev = y_train_raw_cur.copy()
                y_test_raw_prev = y_test_raw_cur.copy()
                
                for spkr in spkrs_neu_pred:
                    
                    if spkrs_neu_pred[spkr] / len(spkrs_use[spkrs_train == spkr]) < self.spkr_file_threshold:
                        
                        argsort_spkr_ll = np.argsort(-1 * probs_ldc[spkrs_train == spkr])
                        spkr_neu_num = int(np.ceil(0.2 * len(spkrs_use[spkrs_train == spkr])))
                        neu_spkr_idx = argsort_spkr_ll[:spkr_neu_num]
                        emo_spkr_idx = argsort_spkr_ll[spkr_neu_num:]
                        
                        
                        y_train_raw_cur[neu_spkr_idx] = self.neutral_label
                        y_train_raw_cur[emo_spkr_idx] = self.emo_label
                        
                        
                        
                        if self.norm_scheme == "ifn":
                            
                            ref_avg_f0 = sum([sum(self.ref_corpus[i]) / len(self.ref_corpus[i])\
                                          for i in range(len(self.ref_corpus))]) / len(self.ref_corpus)
                        
                            neu_spkr_f0 = sum([sum(i) for i in X_train_raw[(spkrs_train == spkr) & (probs_ldc > self.neu_threshold)]]\
                                          / len(i)) / len(X_train_raw[(spkrs_train == spkr) & (probs_ldc > self.neu_threshold)])
                        
                            self.norm_constants[spkr] = ref_avg_f0 / neu_spkr_f0
                            
                            
                            X_train_raw_cur[spkrs_train == spkr] = X_train_raw[spkr_train == spkr] / self.norm_constants[spkr]
                        
                        elif self.norm_scheme == "opt":
                            
                            ref_avg_f0 = sum([sum(self.ref_corpus[i]) / len(self.ref_corpus[i])\
                                          for i in range(len(self.ref_corpus))]) / len(self.ref_corpus)
                        
                            neu_spkr_f0 = sum([sum(i) for i in X_train_raw[(spkrs_train == spkr) & (y_train_raw == self.neutral_label)]]\
                                          / len(i)) / len(X_train_raw[(spkrs_train == spkr) & (y_train_raw == self.neutral_label)])
                        
                            self.norm_constants[spkr] = ref_avg_f0 / neu_spkr_f0
                            
                            
                            X_train_raw_cur[spkrs_train == spkr] = X_train_raw[spkr_train == spkr] / self.norm_constants[spkr]
                        
                
                        elif self.norm_scheme == "none":
                            
                            continue
                            
                change = np.sum(y_train_raw_cur == y_train_raw_prev) / len(y_train_raw_prev)
                
                self.ldc = ldc 
                
                clf_report = None
                
                if self.analysis:
                    ldc_prob_test = ldc.predict_proba(X_test_feat)
                    ldc_labels_test = np.int64(ldc_prob_test > self.neu_threshold)
                    clf_report = classification_report(y_test_raw, ldc_labels_test, output_dict=True)
                    
                    if clf_report["accuracy"] > best_test_acc:
                        best_test_acc = clf_report["accuracy"]
                        self.best_ldc = ldc
                    
                
                if change <= self.switch_threshold:
                    break
                    
                else:
                    
                    ss_iter_dir = self.log_dir + "ss_iter_" + str(ss_iter) + "/"
                    if not os.path.exists(ss_iter_dir):
                        os.mkdir(ss_iter_dir)
                    
                    ifn_iter_dir = ss_iter_dir + str(i) + "/"
                    if not os.path.exists(ifn_iter_dir):
                        os.mkdir(ifn_iter_dir)
                    
                    with open(ifn_iter_dir + "clf_report.pkl", "wb") as f:
                        pickle.dump(clf_report, f)
                    
                    with open(ifn_iter_dir + "ref_gmms.pkl", "wb") as f:
                        pickle.dump(self.ref_gmms, f)
                        
                    with open(ifn_iter_dir + "ldc.pkl", "wb") as f:
                        pickle.dump(self.ldc, f)
                        
                    with open(ifn_iter_dir + "best_ldc.pkl", "wb") as f:
                        pickle.dump(self.best_ldc, f)
                        
                
                
        

In [16]:
np.vstack((np.ones((3, 3)), np.zeros((4, 3))))

array([[1., 1., 1.],
       [1., 1., 1.],
       [1., 1., 1.],
       [0., 0., 0.],
       [0., 0., 0.],
       [0., 0., 0.],
       [0., 0., 0.]])

In [103]:

np.argsort(-1 * np.array([1, 2, 3]))[:int(np.ceil(0.2 * 3))]

array([2])

In [102]:
np.ceil(0.2)

1.0

In [109]:
np.sum(np.array([1, 2, 3]) == np.array([1, 1, 2]))

1

In [118]:
classification_report([1, 1, 0], [1, 1, 1], output_dict=True, )

  _warn_prf(average, modifier, msg_start, len(result))


{'0': {'precision': 0.0, 'recall': 0.0, 'f1-score': 0.0, 'support': 1},
 '1': {'precision': 0.6666666666666666,
  'recall': 1.0,
  'f1-score': 0.8,
  'support': 2},
 'accuracy': 0.6666666666666666,
 'macro avg': {'precision': 0.3333333333333333,
  'recall': 0.5,
  'f1-score': 0.4,
  'support': 3},
 'weighted avg': {'precision': 0.4444444444444444,
  'recall': 0.6666666666666666,
  'f1-score': 0.5333333333333333,
  'support': 3}}

In [106]:
a = [[1], [2, 3], [4, 2 ,1]]
b = ["00", "01", "00"]
train_test_split(a, b, test_size=0.1)

[[[2, 3], [4, 2, 1]], [[1]], ['01', '00'], ['00']]

((array(['aa', 'bb', 'cc', 'dd', 'ee'], dtype='<U2'), array([5, 6, 6, 5, 6])),
 (array(['aa', 'bb', 'cc', 'dd', 'ee'], dtype='<U2'), array([3, 2, 2, 3, 2])),
 (array(['aa', 'bb', 'cc', 'dd', 'ee'], dtype='<U2'), array([8, 8, 8, 8, 8])))

In [43]:
np.vstack((b, c)).T

array([[1, 2],
       [1, 2],
       [1, 3],
       [1, 3],
       [1, 4],
       [0, 4],
       [0, 5],
       [0, 5],
       [0, 6],
       [0, 6]])

In [115]:
a = np.arange(10)
np.int64(a > 5)

array([0, 0, 0, 0, 0, 0, 1, 1, 1, 1])