In [2]:
import librosa
import numpy as np
import matplotlib.pylab as plt
import librosa.display
from scipy.signal import find_peaks
import pandas as pd
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn import svm
import IPython.display as ipd
from sklearn.metrics import classification_report
from scipy import stats
import os
from numpy import linalg as LA
import warnings
import pickle
warnings.simplefilter("ignore")
%matplotlib tk

## Basic Configurations

In [3]:
sr = 44100
n_fft = int(sr * (20/1000)) # frequency resolution for basic features
# n_fft = 441000
window_length = n_fft #frame size for basic features
hop_length = window_length // 2 #time resolution for basic features
segment_size = sr * 1 # 1 second will be used for segmenting and statistics on short time features

## Load Basic Variable Jsons from hard disk

In [4]:
labling_type = 'bi_label'

In [5]:

with open('./pickles/audios.pickle', 'rb') as handle:
    audios = pickle.load(handle)
with open('./pickles/audios.pickle', 'rb') as handle:
    srs = pickle.load(handle)
with open('./pickles/train_audios_names.pickle', 'rb') as handle:
    train_audios_names = pickle.load(handle)
with open('./pickles/test_audios_names.pickle', 'rb') as handle:
    test_audios_names = pickle.load(handle)
# with open(f'./pickles/{labling_type}/training_sample_labeling.pickle', 'rb') as handle:
#     training_sample_labeling = pickle.load(handle)
# with open(f'./pickles/{labling_type}/testing_sample_labeling.pickle', 'rb') as handle:
#     testing_sample_labeling = pickle.load(handle)
with open(f'./pickles/{labling_type}/training_frame_labeling.pickle', 'rb') as handle:
    training_frame_labeling = pickle.load(handle)
with open(f'./pickles/{labling_type}/testing_frame_labeling.pickle', 'rb') as handle:
    testing_frame_labeling = pickle.load(handle)

## Basic Features

In [6]:
def basic_features(audio, n_fft, hop_length, window_length, center=False):
    chroma = librosa.feature.chroma_stft(audio, sr=sr, n_fft=n_fft, win_length=window_length, hop_length=hop_length, center=center)
    S = librosa.feature.melspectrogram(y=audio, sr=sr, hop_length=hop_length, n_fft=n_fft, win_length=window_length, center=center)
    mfcc = librosa.feature.mfcc(S=librosa.power_to_db(S))
    S_diff = np.diff(S, prepend=0)
    flux = LA.norm(S_diff, ord=2, axis=0) ** 2
    stft = librosa.stft(audio, n_fft=n_fft, hop_length=hop_length, win_length=window_length, center=center)
    S, phase = librosa.magphase(stft)
    rms = librosa.feature.rms(S=S, frame_length=window_length, hop_length=hop_length, center=center)[0]
    frequencies = librosa.fft_frequencies(sr=sr, n_fft=n_fft)
    cent = librosa.feature.spectral_centroid(S=S, freq=frequencies)[0]
    rolloff = librosa.feature.spectral_rolloff(S=S, freq=frequencies, roll_percent=0.95)[0]
    zero_crossing_rate = librosa.feature.zero_crossing_rate(audio,center=center, frame_length=window_length, hop_length=hop_length)[0]
    flatness = librosa.feature.spectral_flatness(S=S,center=center)[0]
    bandwidth = librosa.feature.spectral_bandwidth(S=S, sr=sr, hop_length=hop_length, win_length=window_length, center=center)[0]
    
    return {'cent': cent, 'rolloff': rolloff, 'flux': flux, 'zero_crossing_rate': zero_crossing_rate, 'rms': rms, 'flatness': flatness,
    'chroma': chroma, 'mfcc': mfcc, 'bandwidth': bandwidth}

calculate features for all audios

In [57]:
training_features = {}
testing_features = {}

for i,v in audios.items():
    print(f'calculating audio({i}) features...')
    features_list = basic_features(v, n_fft, hop_length, window_length)
    if i in train_audios_names:
        training_features[i] = features_list
    elif i in test_audios_names:
        testing_features[i] = features_list

calculating audio(eatmycountry1609.mp3) features...
calculating audio(theconcert16.mp3) features...
calculating audio(ConscinciasParalelasN11-OEspelhoEOReflexoFantasiasEPerplexidadesParte413-12-1994.mp3) features...
calculating audio(ConscinciasParalelasN7-OsSentidosOSentirEAsNormasParte715-1-1994.mp3) features...
calculating audio(theconcert2.mp3) features...
calculating audio(UTMA-26.mp3) features...
calculating audio(ConscinciasParalelasN3-OsSentidosOSentirEAsNormasParte318-10-1994.mp3) features...


Save basic training and testing features on hard disk

In [11]:
with open('pickles/training_basic_features.pickle', 'wb') as handle:
    pickle.dump(training_features, handle, protocol=pickle.HIGHEST_PROTOCOL)
with open('pickles/testing_basic_features.pickle', 'wb') as handle:
    pickle.dump(testing_features, handle, protocol=pickle.HIGHEST_PROTOCOL)

you can load basic features from disk

In [7]:
with open('./pickles/training_basic_features.pickle', 'rb') as handle:
    training_features = pickle.load(handle)
with open('./pickles/testing_basic_features.pickle', 'rb') as handle:
    testing_features = pickle.load(handle)

## Moving Statistics Method

In [8]:
def moving_method(arr_, window_length, hop_length, method, mode='valid'):
    arr = np.array(arr_, ndmin=2)
    def method_per_frame():
        start = 0
        if mode == 'valid':
            while start+window_length <= arr.shape[1]:
                yield getattr(np, method)(arr[:, start:start+window_length], axis=1, keepdims=True)
                start += hop_length
        else:
            while start < arr.shape[1]:
                yield getattr(np, method)(arr[:, start:start+window_length], axis=1, keepdims=True)
                start += hop_length
    return np.hstack(list(method_per_frame()))

## Normalizing 

In [9]:
def normalizing(y_, model='z-score'):
    y = y_.copy()
    if model == 'z-score':
        return stats.zscore(y)
    mean_ = np.mean(y, axis=0, keepdims=True)
    y -= np.repeat(mean_, y.shape[0], axis=0)
    max_ = np.max(np.abs(y), axis=0, keepdims=True)
    min_ = np.min(y, axis=0, keepdims=True)
    range_ = (max_ - min_)/2
    y /= np.repeat(max_, y.shape[0], axis=0)
    
    return y

In [10]:
merged_features = {**training_features, **testing_features}
merged_labels = {**training_frame_labeling, **testing_frame_labeling}
all_audio_names = list(merged_labels.keys())

test_audios_names = list(testing_features.keys())
train_audios_names = list(training_features.keys())
np.random.seed(51328973)
np.random.shuffle(all_audio_names)
audio_lens = [len(merged_labels[audio_name]) for audio_name in all_audio_names]
total_lens = np.sum(audio_lens)
split_ratio = 0.8
i = 0
split_ratio_i = 0
while np.abs(split_ratio_i-split_ratio) > np.abs(split_ratio-(np.sum(audio_lens[:i+1])/total_lens)):
    i += 1
    split_ratio_i = np.sum(audio_lens[:i])/total_lens
train_audios_names = all_audio_names[:i]
test_audios_names = all_audio_names[i:]

In [11]:
print('Train Audios', train_audios_names, 'Test Audios', test_audios_names)

Train Audios ['ConscinciasParalelasN11-OEspelhoEOReflexoFantasiasEPerplexidadesParte413-12-1994.mp3', 'eatmycountry1609.mp3', 'theconcert16.mp3', 'ConscinciasParalelasN7-OsSentidosOSentirEAsNormasParte715-1-1994.mp3', 'theconcert2.mp3'] Test Audios ['UTMA-26.mp3', 'ConscinciasParalelasN3-OsSentidosOSentirEAsNormasParte318-10-1994.mp3']


## Desired Features list

In [12]:
features_names = ['cent', 'rolloff', 'flux', 'zero_crossing_rate', 'rms', 'flatness', 'chroma', 'mfcc', 'bandwidth']
# features_names = ['cent']

In [13]:
segment_frames_nums = librosa.time_to_frames(1, sr=sr, hop_length=hop_length, n_fft=n_fft)
testing_features_final = np.vstack([np.hstack([moving_method(np.vstack([merged_features[audio_name][feature_name] for feature_name in features_names]), segment_frames_nums, (segment_size//4)//hop_length, method) for audio_name in test_audios_names]) for method in ['mean', 'std']])
testing_labels = np.hstack([merged_labels[audio_name] for audio_name in test_audios_names])
training_features_final = np.vstack([np.hstack([moving_method(np.vstack([merged_features[audio_name][feature_name] for feature_name in features_names]), segment_frames_nums, (segment_size//4)//hop_length, method) for audio_name in train_audios_names]) for method in ['mean', 'std']])
training_labels = np.hstack([merged_labels[audio_name] for audio_name in train_audios_names])

## Dynamic Programming

In [14]:
def classification_dp(feature_seq, transition_cost=None, local_cost=None, alpha=0.1):
    feature_seq_arr = np.array(feature_seq, ndmin=2)
    def state_eq_distance (state_index_0, state_index_1):
        return np.sqrt(np.sum((feature_seq_arr[:, state_index_1] - feature_seq_arr[:, state_index_0])**2))
    def default_transition_cost (state_index_0, state_index_1):
        eq_distance = state_eq_distance(state_index_0, state_index_1)
        return 1/eq_distance if eq_distance > 1e-6 else 0
    if transition_cost is None: transition_cost = default_transition_cost
    if local_cost is None: local_cost = state_eq_distance

    costs_pre = np.zeros((feature_seq_arr.shape[1], feature_seq_arr.shape[1], 2))
    for i in range(feature_seq_arr.shape[1]):
        for j in range(feature_seq_arr.shape[1]):
            costs_pre[i, j, 0], costs_pre[i, j, 1] = alpha*transition_cost(i, j), (1-alpha)*local_cost(i, j)

    DP_table = [[((1-alpha)*local_cost(0, i), -1) for i in range(feature_seq_arr.shape[1])]]
    for i in range(1, feature_seq_arr.shape[1]):
        DP_table += [[]]
        for j in range(feature_seq_arr.shape[1]):
            all_costs = [DP_table[-2][k][0]+costs_pre[k, j, 0]+costs_pre[i, j, 1] for k in range(j+1)]
            argmin_cost = np.argmin(all_costs)
            DP_table[-1] += [(all_costs[argmin_cost], argmin_cost)]
            del all_costs

    optimal_path = [np.argmin([cost for cost, state_i in DP_table[-1]])]
    i = -1
    while i > -feature_seq_arr.shape[1]:
        optimal_path = [DP_table[i][optimal_path[0]][1]] + optimal_path
        i -= 1
    result = feature_seq_arr[:, optimal_path]
    return result

def moving_classification_dp(feature_seq, window_length, transition_cost=None, local_cost=None, alpha=0.1):
    improved_feature_seq = np.array(feature_seq.copy(), ndmin=2)
    i = 0
    while i < improved_feature_seq.shape[1]:
        improved_feature_seq[:, i:i+window_length] = classification_dp(improved_feature_seq[:, i:i+window_length], transition_cost, local_cost, alpha)
        i += window_length
    return improved_feature_seq

## Evaluations

In [15]:
clf = LinearDiscriminantAnalysis()
clf.fit(training_features_final.T, training_labels)
predicts = clf.predict(testing_features_final.T)
transforms = clf.transform(testing_features_final.T)
print(classification_report(testing_labels, predicts))

              precision    recall  f1-score   support

           0       0.93      0.93      0.93     15778
           1       0.84      0.84      0.84      6911

    accuracy                           0.90     22689
   macro avg       0.89      0.89      0.89     22689
weighted avg       0.90      0.90      0.90     22689



In [16]:
clf = LinearDiscriminantAnalysis()
clf.fit(normalizing(training_features_final.T), training_labels)
predicts = clf.predict(normalizing(testing_features_final.T))
transforms = clf.transform(normalizing(testing_features_final.T))
print(classification_report(testing_labels, predicts))

              precision    recall  f1-score   support

           0       0.91      0.96      0.93     15778
           1       0.89      0.77      0.83      6911

    accuracy                           0.90     22689
   macro avg       0.90      0.86      0.88     22689
weighted avg       0.90      0.90      0.90     22689



In [17]:
# times = librosa.frames_to_time(range(500), sr=sr, n_fft=s, hop_length=segment_size//2)
plt.plot(predicts[:700])
plt.plot(testing_labels[:700])

[<matplotlib.lines.Line2D at 0x7faaf2d227d0>]

In [73]:
SVM = svm.SVC()
SVM.fit(moving_classification_dp(clf.transform(normalizing(training_features_final.T)).T, 100, alpha=0.99).T, training_labels)
dp_transform = moving_classification_dp(clf.transform(normalizing(testing_features_final.T)).T, 100, alpha=0.99).T
dp_predicts = SVM.predict(dp_transform)
print(classification_report(testing_labels, dp_predicts))

              precision    recall  f1-score   support

           0       0.95      0.98      0.97     15778
           1       0.96      0.89      0.92      6911

    accuracy                           0.95     22689
   macro avg       0.95      0.94      0.95     22689
weighted avg       0.95      0.95      0.95     22689



In [22]:

plt.plot(transforms[:1000])
plt.plot(dp_transform[:1000])

[<matplotlib.lines.Line2D at 0x7fa73299add0>]

In [21]:
plt.plot(dp_predicts[2500:3700]*0.7)
plt.plot(testing_labels[2500:3700])
plt.plot(predicts[2500:3700]*0.4)


[<matplotlib.lines.Line2D at 0x7f9e62da24a0>]