In [1]:
from __future__ import print_function
from pathlib import Path
import numpy as np, scipy, matplotlib.pyplot as plt, IPython.display as ipd
import librosa, librosa.display
import pandas as pd
from tqdm import tqdm
from scipy.optimize import minimize
import warnings

%matplotlib inline

In [2]:
cd ../libs

/Users/zhonglingjiang/Desktop/Fall 2019/Big Data Analytics/music_emotion_recognition/libs


In [3]:
from rp_extract.rp_extract import rp_extract
from rp_extract.audiofile_read import *
from rp_extract.rp_plot import *

In [4]:
cd ../notebooks

/Users/zhonglingjiang/Desktop/Fall 2019/Big Data Analytics/music_emotion_recognition/notebooks


In [5]:
audio_path = Path.cwd().parent / '1000songs' / 'clips_45seconds'

## Now aggregate everything
## Part I: Feature Extraction

In [6]:
def append_val(d, k, v):
    if k not in d:
        d[k] = [v]
    else:
        d[k].append(v)
    return d

def temporal_centroid(envelope):
    """computes the temporal centroid of an onset envelope"""
    D = np.abs(librosa.stft(envelope))
    times = librosa.times_like(D)

    onset_strength = librosa.onset.onset_strength(y=envelope, sr=sr)
    
    try:
        temporal_centroid = sum(onset_strength * times) / sum(onset_strength)
    except RuntimeWarning:
        temporal_centroid = np.nan
    
    return temporal_centroid

def log_attack_time(envelope, sr, thresh_percent):
    D = np.abs(librosa.stft(envelope))
    times = librosa.times_like(D)
    onset_strength = librosa.onset.onset_strength(y=envelope, sr=sr)
    
    stop_attack_index = np.argmax(onset_strength)
    stop_attack_value = envelope[stop_attack_index]
    thresh = stop_attack_value * thresh_percent / 100
    
    try:
        start_attack_index = [x > thresh for x in onset_strength].index(True)
    except ValueError:
        return np.nan
    
    if start_attack_index == stop_attack_index:
        start_attack_index -= 1

    log_attack_time =  np.log10(times[stop_attack_index] - times[start_attack_index])
    
    return log_attack_time

def extract_features(signal, sr):
    """Given and a signal and its sampling rate, compute all the features"""
    
    # Temporal Features
    onset_samples = np.unique(librosa.onset.onset_detect(signal, sr=sr, backtrack=True, units='samples'))
    all_envelopes = np.split(signal, onset_samples)
    
    zero_crossings = np.array([sum(librosa.zero_crossings(x, pad=False)) for x in all_envelopes])
    zero_features = np.array([np.mean(zero_crossings), np.std(zero_crossings)])
    
    temporal_centroids = np.array([temporal_centroid(x) for x in all_envelopes])
    temporal_centroids = temporal_centroids[~np.isnan(temporal_centroids)]
    temporal_cen_features = np.array([np.mean(temporal_centroids), np.std(temporal_centroids)])
    
    log_attacks = np.array([log_attack_time(x, sr, 50) for x in all_envelopes])
    log_attacks = log_attacks[~np.isnan(log_attacks)]
    log_attack_features = np.array([np.mean(log_attacks), np.std(log_attacks)])
    
    # Rhythmic Feautres (without dimension reduction for now)
    rhythm = rp_extract(signal, sr, extract_rh=True, transform_db=True, transform_phon=True, transform_sone=True,          
        fluctuation_strength_weighting=True, 
        skip_leadin_fadeout=1,             
        step_width=1)
    rhythm_hist = rhythm['rh']
    rhythm_mean = np.array([np.mean(rhythm_hist)])
    
    all_features = np.concatenate([zero_features, temporal_cen_features, log_attack_features, rhythm_hist, rhythm_mean])
    
    return all_features

In [7]:
warnings.filterwarnings('ignore')
audio_path = Path.cwd().parent / '1000songs' / 'clips_45seconds'
all_mp3_paths = list(audio_path.glob('**/*.mp3'))
song_ids = []
audio_train = [] 
for path in tqdm(all_mp3_paths):
    if len(audio_train) < 10: #we'll start off with only 10 training pieces to save time
        signal, sr = librosa.load(str(path))
        try:
            song_features = extract_features(signal, sr)
            audio_train.append(song_features)
            song_ids.append(int((str(path).split('/')[-1].split('.')[0])))

        except ValueError as e:
            print(path)
            continue
    else:
        break

audio_train = np.array(audio_train).T

  1%|          | 10/1000 [00:51<1:22:42,  5.01s/it]

In [8]:
audio_train.shape

(67, 10)

In [9]:
# we need a test song
test_song = '108.mp3'
signal, sr = librosa.load(str(path))
audio_test = extract_features(signal, sr)

In [10]:
audio_test.shape

(67,)

## Part II: Kernel Density Estimation

In [11]:
kernel_density = pd.read_csv('../Time_Average_Gamma_0_1.csv', header=0)
kernel_density.head()

Unnamed: 0,song_id,"[0,0]","[0,1]","[0,2]","[0,3]","[0,4]","[0,5]","[0,6]","[0,7]","[0,8]",...,"[15,6]","[15,7]","[15,8]","[15,9]","[15,10]","[15,11]","[15,12]","[15,13]","[15,14]","[15,15]"
0,2,0.003781,0.003843,0.003895,0.003934,0.003962,0.003978,0.003981,0.003972,0.00395,...,0.003695,0.003687,0.003667,0.003636,0.003594,0.003541,0.003479,0.003406,0.003325,0.003236
1,3,0.003811,0.00387,0.003917,0.003952,0.003975,0.003986,0.003984,0.00397,0.003944,...,0.003705,0.003692,0.003667,0.003632,0.003585,0.003528,0.003461,0.003385,0.0033,0.003208
2,4,0.003245,0.00333,0.003406,0.003473,0.00353,0.003577,0.003614,0.003639,0.003653,...,0.003959,0.003987,0.004003,0.004006,0.003996,0.003974,0.00394,0.003895,0.003837,0.003769
3,5,0.003561,0.003652,0.003734,0.003806,0.003868,0.003918,0.003956,0.003983,0.003996,...,0.003621,0.003645,0.003657,0.003659,0.003649,0.003627,0.003595,0.003552,0.003498,0.003434
4,7,0.003059,0.003153,0.003239,0.003318,0.003387,0.003448,0.003498,0.003539,0.003568,...,0.004027,0.004073,0.004107,0.004129,0.004137,0.004133,0.004115,0.004085,0.004043,0.003988


In [12]:
# Just take a subsample - 4 songs ['752.mp3', '746.mp3', '791.mp3', '949.mp3' ]
# For code debug purpose
song_ids 
# selected_kernel_density = kernel_density.loc[kernel_density['song_id'].isin(selected_song_id), ]

[752, 746, 791, 949, 785, 975, 961, 550, 236, 222]

In [13]:
#selected_kernel_density.drop(['song_id'], axis=1).values

## Part III: Mapping Factor Learning

In [14]:
# Non-negative sparse representation (SR+) seems require solution >0
# Need to enforce additional condition

lamda = 0.01
def estimate(x, beta):
    """The function estimates linear outcome given an observation and a coeffcient vector."""
    yhat = np.dot(x, beta)
    return yhat

def RSS_L1(beta, X, y, lamda):
    y_hat = np.dot(X, beta)
    return np.sum((y - y_hat)**2) + lamda* np.sum(np.abs(beta))

beta0 = np.random.normal(0, 1, audio_train.shape[1])
res = minimize(fun=RSS_L1, x0=beta0, args=(audio_train, audio_test, lamda))
beta_hat = res.x
mapping_factors = beta_hat
print('The estimated β vector is: \n')
print(mapping_factors)

The estimated β vector is: 

[ 0.25330639  0.59165405  0.30269512  0.10057767 -0.46341318  0.16934319  0.56199342  0.19025291 -0.97427703 -0.16782478]


### Part IV: Emotion Space Mapping

In [15]:
pred_emotion = np.zeros(16 * 16)
for i, s_id in enumerate(song_ids):
    if (kernel_density.loc[kernel_density['song_id'] == s_id, ].shape[0] == 0):
        emotion_val = np.zeros(16 * 16)
    else: 
        emotion_val = kernel_density.loc[kernel_density['song_id'] == s_id, ]. \
                        drop(['song_id'], axis=1).values[0]
    pred_emotion += mapping_factors[i] * emotion_val
pred_emotion

array([0.00159123, 0.00167126, 0.00174831, 0.00182168, 0.00189073, 0.00195482, 0.00201337, 0.00206584, 0.00211173, 0.00215062, ..., 0.00245644, 0.00250547, 0.00254659, 0.00257943, 0.00260368,
       0.00261912, 0.00262563, 0.00262317, 0.0026118 , 0.00259165])

In [122]:
# # Be aware of the order of songs when doing this step!
# e = selected_kernel_density.drop(['song_id'], axis=1).values
# pred_espace = np.dot(e.T, mapping_factors)
# print(pred_espace)
# len(pred_espace)

[0.01008467 0.01022267 0.01033025 0.01040642 0.01045046 0.01046197 0.01044083 0.01038725 0.01030172 0.01018504 ... 0.00970694 0.00965688 0.00957712 0.00946841 0.00933175 0.00916838 0.00897979
 0.00876765 0.00853384 0.00828036]


256