In [None]:
from __future__ import print_function

import librosa.display
import librosa
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Audio
from scipy.ndimage.filters import uniform_filter1d



import dissonant
import os
import scipy
import pandas as pd

import import_ipynb
from scipy import stats
import spleeter

from pydub import AudioSegment
import soundfile
from scipy.signal import savgol_filter
from scipy import signal
import scipy.signal
from matplotlib.lines import Line2D


In [None]:
%run Only_features.ipynb     #defining our custom functions

In [None]:
def music_loading(file_name, path, sr = 44100):
    #Enter file name and (relative) path
    audio_fpath = path + file_name + '.wav'
    print(audio_fpath)

    # load audio file
    y, sr = librosa.load(audio_fpath, sr = 44100)

    ###Trimming should be done in the very beginning (cut out silent parts)
    y, index = librosa.effects.trim(y=y, frame_length=sr, top_db=60)

    trimmed_audio_path = path + file_name + "_trim" + '.wav'
    soundfile.write(trimmed_audio_path, y, sr)

    return y,sr, trimmed_audio_path

In [None]:
def feature_extraction(y,sr, trimmed_audio_path, category = "polyphonic",  start_bpm = 80):
    #Features 

    #Tempo
    [tempo, t_tempo] = tempo_extraction(y,sr, start_bpm = start_bpm)

    #Onset
    [onset_env, onset_freq, t_onset] = onset_frequency(y,sr)

    #Loudness
    [loudness, t_loudness] = extract_loudness(y,sr)

    #pitches
    pitch_df = pitch_extraction(trimmed_audio_path,sr,category = "polyphonic")

    #Dissonance
    [dissonance, t_dissonance] = dissonance_extraction(pitch_df, sr)
    
    
    
    #Tempo & Loudness
    df_tup = list(zip(t_tempo, tempo, loudness[0]))
    df_tempo_loudness= pd.DataFrame(df_tup, columns = ["time", "tempo", "loudness"])

    df_tempo_loudness['tempo_z'] = stats.zscore(df_tempo_loudness['tempo'])
    df_tempo_loudness['loudness_z'] = stats.zscore(df_tempo_loudness['loudness'])

    
    #Pitch 

    #Integrating melody and bass to use all pitches for standardization
    df_long = pd.melt(pitch_df, id_vars='time')
    df_long["pitch_z"] = (df_long.value - df_long.value.mean())/df_long.value.std(ddof=0)

    #Separating the pitches again 
    #So now, we have the two pitch lines but standardized on the basis of the two lines together
    df_wide_pitch_z = df_long.pivot(index='time', columns='variable', values='pitch_z')
    df_wide_pitch_z = df_wide_pitch_z.reset_index()

    #Merging pitch with tempo and loudness
    df_loudness_tempo_pitch = pd.merge(df_wide_pitch_z, df_tempo_loudness.iloc[:,[0,3,4]], on = "time", how = "left")

    
    #Dissonance
    df_tup = zip(t_dissonance, dissonance)
    df_dissonance= pd.DataFrame(df_tup, columns = ["time", "dissonance"])
    df_dissonance = df_dissonance.dropna()
    df_dissonance['dissonance_z'] = stats.zscore(df_dissonance['dissonance'])
    
    #Onset frequency
    df_tup = list(zip(t_onset, onset_freq))
    df_onset_freq= pd.DataFrame(df_tup, columns = ["time", "onset_freq"])

    df_onset_freq['onset_freq_z'] = stats.zscore(df_onset_freq['onset_freq'])

    ## Merging --
    #Onset frequency
    merge_onset = pd.merge(df_loudness_tempo_pitch,df_onset_freq.iloc[:,[0,2]],on='time', how='left')

    merge_onset.loc[pd.notnull(merge_onset.onset_freq_z)]
    
    #Dissonance
    df_all_features = pd.merge(merge_onset, df_dissonance.iloc[:,[0,2]], on = "time", how = 'left')

    df_all_features.loc[pd.notnull(df_all_features.dissonance_z)]
    ##Padding nas for plotting
    df_plot = df_all_features.fillna(method='ffill')

    return df_all_features, df_plot, pitch_df, onset_env

In [None]:
def plot_features(df_plot,color_list):
    plt.figure(figsize=(20, 8))

    ##rearranging the columns
    cols = df_plot.columns.tolist()
    cols = [cols[0]]+ cols[-4:] + cols[1:-4]
    df_plot = df_plot[cols]  

    count = 0
    custom_lines = []
    feature_list = []
    for f in df_plot.columns[1:]:
        plt.plot(df_plot.time, df_plot[f], color = color_list[count])
        custom_lines.append(Line2D([0], [0], color = color_list[count], lw = 3))
        feature_list.append(f)
        count = count + 1


    plt.legend(custom_lines, feature_list , 
               loc='right', bbox_to_anchor=(1.2, 0.37), fancybox=True, facecolor='white', framealpha=1, 
              edgecolor = "black", fontsize = 20)

    plt.title("All Features over Time", fontsize = 30)
    plt.ylabel("Z-Scores", fontsize = 20)
    plt.xlabel("Time (s)", fontsize = 20)


In [None]:
def feature_smoothing(df_plot):


    smooth_features = []
    for f in df_plot.columns[1:]:
        feature = df_plot[f].fillna(0)
        feature_smooth = uniform_filter1d(feature, size =int(len(y)/30)) 
        smooth_features.append(feature_smooth)

    dict_smooth = dict(zip(df_plot.columns[1:], smooth_features))

    features_smooth = pd.DataFrame(dict_smooth)
    features_smooth['time'] = df_plot['time']

    #plt.figure(figsize=(20, 8))
    #count = 0
    #for f in features_smooth.columns[:-1]:

     #   plt.plot(features_smooth.time, features_smooth[f], color = color_list[count])
      #  count = count+1

    #plt.title("All Features over Time")
    #plt.ylabel("Z-Scores")
    #plt.xlabel("Time (s)")
    
    return features_smooth

In [None]:
def feature_resampling_10Hz(features_smooth, df_plot):
    sr_features = len(features_smooth)/max(features_smooth['time'])

    all_features_10Hz = features_smooth.iloc[::int(sr_features/10)]
    all_features_10Hz = all_features_10Hz.reset_index()

    all_features_10Hz_unsmoothed = df_plot.iloc[::int(sr_features/10)]
    all_features_10Hz_unsmoothed = all_features_10Hz_unsmoothed.reset_index()
    
    return all_features_10Hz, all_features_10Hz_unsmoothed

In [5]:
def tension_extraction_10Hz_window(all_features_10Hz, model_configuration = ['optimized', 'original']):
    
    ##add the actual optimized values
    if model_configuration == 'optimized':
        attentional_windows = {'dissonance':1, 'tempo':1, 'loudness':1, 'pitch':1, 'onset_freq':1}
        memory_windows = {'dissonance':1, 'tempo':1, 'loudness':1, 'pitch':1, 'onset_freq':1}
        global_integration_window = 2

    elif model_configuration == 'original':
        attentional_windows = {'dissonance':8.5, 'tempo':2, 'loudness':2, 'pitch':2, 'onset_freq':1}
        memory_windows= {'dissonance':13, 'tempo':2, 'loudness':0, 'pitch':2, 'onset_freq':1}
        global_integration_window = 1
        
    # get the exact sampling rate for our features: 
    sr_features = len(all_features_10Hz)/max(all_features_10Hz['time'])

    #Calculate the shift (0.25 s recommended)
    shift = round(0.25 * sr_features)
    if shift == 0:
        shift = 1

    #Could potentially be optimized? 
    prevSlope = 0.5

    feature = []
    window = []
    slopes_m = []
    times = []

    pitch_col = [col for col in all_features_10Hz if col.startswith('line')]
    pitch_num = len(pitch_col)

    #Loop through all columns except for time (i.e. the features)
    for i in range(0,len(all_features_10Hz), shift):
        slopes = []
        for f in all_features_10Hz.columns[4:]:
            if f == "dissonance_z":
                attention_window = attentional_windows['dissonance']
                memory_window = memory_windows['dissonance']

            elif f == "tempo_z":
                attention_window = attentional_windows['tempo']
                memory_window = memory_windows['tempo']
                    
            elif f == "loudness_z":
                attention_window = attentional_windows['loudness']
                memory_window = memory_windows['loudness']
                
            elif f == "pitch":
                attention_window = attentional_windows['pitch']
                memory_window = memory_windows['pitch']
                    
            elif f.startswith("line"):
                attention_window = attentional_windows['pitch']
                memory_window = memory_windows['pitch']
                    
            elif f == "onset_freq_z":
                attention_window = attentional_windows['onset_freq']
                memory_window = memory_windows['onset_freq']


            n_samp_m = int(memory_window*round(sr_features))
            n_samp_a = int(attention_window*round(sr_features))

        
            #get start and end points for attentional window
            start_a = i
            end_a = n_samp_a + i
            x_a = list(range(0,n_samp_a))

            #End: attentional window is shorter
            if (len(all_features_10Hz) - end_a) < 0:
                end_a = len(all_features_10Hz) 
                x_a = range(0,(end_a - start_a))

            #and for the memory window (if there is one)
            if memory_window > 0:
                end_m = start_a - 1
                start_m = end_m - n_samp_m

                if start_m < 1:
                    start_m = 0
                    x_m = list(range(0,end_m))
                else:
                    x_m = list(range(0,n_samp_m))


                curr_mem = end_m - start_m +1
                
                if i > 2: # need at least 2 points for memory window
                    memoryWindowActive = True
                else:
                    memoryWindowActive = False

            #get the current attentional window
            curr_win = all_features_10Hz[f].iloc[start_a:end_a]
            if len(curr_win) > 1:
                slope = np.polyfit(x_a, curr_win, 1)
                slope = slope[0]
                feature.append(f)
                

                if f.startswith('line'):
                    slope = slope*(1/pitch_num)
                    
            if memory_window > 0 and memoryWindowActive == True:

                curr_win_m = all_features_10Hz[f].iloc[start_m:end_m]

                    
                slope_m = np.polyfit(x_m,curr_win_m,1)
                prevSlope = slope_m[0]

                slopes_m.append(prevSlope)


            epsilon = .0001;
            decay = .001;

            if (memory_window > 0) and memoryWindowActive:
            # If there is no change in attentional slope (practically
            # speaking) following no change in the memory window, add a
            # decrease the slope of the attentional window slightly.
                if (slope < epsilon) and (slope > -epsilon) and (prevSlope < epsilon) and (prevSlope > -epsilon):
                    slope = slope - decay;
                # if both attentional and memory windows are in the same
                # direction, negative or positive, strengthen the attentional
                # window slope in the current direction
                elif ((slope > 0) and (prevSlope > 0)) or ((slope < 0) and (prevSlope < 0)):
                    # 5 = recommendation
                    slope = slope * 5

            slopes.append(slope)
        
        
        overall_slope = sum(slopes)

        attention_window = global_integration_window
            
        n_samp_a = int(attention_window*round(sr_features))

        start_a = i
        #get start and end points for attentional window

        end_a = n_samp_a + i
        x_a = list(range(1,n_samp_a+1))

        #End: attentional window is shorter
        if (len(all_features_10Hz) - end_a) < 0:
            end_a = len(all_features_10Hz) 
            x_a = range(1,(end_a - start_a+1))


        cur_slope = overall_slope*np.array(x_a)

        if start_a < (len(all_features_10Hz)-1):
            if start_a == 0:
                prediction = cur_slope

            else:
                start = prediction[0:start_a]
                startval = prediction[start_a]
                middle = np.array(cur_slope[0:(len(prediction)-(start_a))]+prediction[(start_a):])/2

                if middle.size!=0:
                    offset1 = startval - middle[0]
                    middle = middle + offset1

                endChunk = cur_slope[len(middle)+1:]

                if middle.size!=0 & endChunk.size!=0:
                    offset2 = middle[-1] - endChunk[0]
                    endChunk = endChunk + offset2

                if endChunk.size!=0:
                    prediction = list(start) + list(middle) + list(endChunk)
                else:
                    prediction = list(start) + list(middle)


    prediction = stats.zscore(prediction)

    df_slopes = pd.DataFrame(zip(all_features_10Hz['time'], prediction), columns = ['time', 'prediction'])
    

    return df_slopes



In [19]:
def plot_tension_and_features_10Hz(df_slopes, all_features_10Hz_unsmoothed, color_list):
    plt.figure(figsize = [20,8])
    
    custom_lines = []
    feature_list = []
    count = 0
    
    cols = all_features_10Hz_unsmoothed.columns.tolist()
    cols_reorder = cols[0:2] + cols[-4:] + cols[2:-4]
    all_features_plot = all_features_10Hz_unsmoothed[cols_reorder]  

    for f in all_features_plot.columns[2:]:
        plt.plot(all_features_plot.time, all_features_plot[f], color = color_list[count])
        count = count+1
        custom_lines.append(Line2D([0], [0], color = color_list[count], lw = 3))
        feature_list.append(f)
        
    plt.plot(df_slopes['time'], df_slopes['prediction'], color = "black", fillstyle='none',
               marker = "s", markerfacecolor = "black", linestyle = "none", markersize = 6)

    plt.plot(df_slopes['time'], df_slopes['prediction'], color = "black", 
             linewidth = 3, alpha = 1)
    


    plt.legend(custom_lines, feature_list , 
               loc='right', bbox_to_anchor=(1.2, 0.37), fancybox=True, facecolor='white', framealpha=1, 
              edgecolor = "black", fontsize = 20)

    plt.title("All Features and the Tension Prediction over Time", fontsize = 30)
    plt.ylabel("Z-Scores", fontsize = 20)
    plt.xlabel("Time (s)", fontsize = 20)


In [3]:
def tension_extraction_10Hz_weights(all_features_10Hz, model_configuration = ['optimized', 'original']):
    
    if model_configuration == 'optimized':
        weights_dict = {'dissonance':2, 'tempo':1, 'loudness':7, 'pitch':8, 'onset_freq':1}
        windows_dict = {'attention':3, 'memory':2}
    
    elif model_configuration == 'original':
        weights_dict = {'dissonance':1, 'tempo':2, 'loudness':3, 'pitch':1, 'onset_freq':2}
        windows_dict = {'attention':3, 'memory':3}
        
    #Get the weights for all columns
    weights = []
    pitch_col = [col for col in all_features_10Hz if col.startswith('line')]
    pitch_num = len(pitch_col)
    
    for f in all_features_10Hz.columns[1:]:

        if f == "dissonance_z":
            weight = weights_dict['dissonance']
            weights.append(weight)
        elif f == "tempo_z":
            weight = weights_dict['tempo']
            weights.append(weight)
        elif f == "loudness_z":
            weight = weights_dict['loudness']
            weights.append(weight)
        #This is not optimal, because there may be different numbers of pitch cols 
        elif f == "pitch":
            weight = weights_dict['pitch']
            weights.append(weight)
        elif f.startswith("line"):
            weight = weights_dict['pitch']/pitch_num
            weights.append(weight)
        elif f == "onset_freq_z":
            weight = weights_dict['onset_freq']
            weights.append(weight)


        scale_weights = sum(weights)

    weights = dict(zip(all_features_10Hz.columns[1:], weights))
    
    
    # get the exact sampling rate for our features: 
    sr_features = len(all_features_10Hz)/max(all_features_10Hz['time'])

    #Calculate the shift (0.25 s recommended)
    shift = int(0.25*sr_features)

    #Could potentially be optimized? 
    prevSlope = 0.5

    feature = []
    window = []
    slopes_m = []
    times = []

    #Loop through all columns except for time (i.e. the features)

    ##Calculating the sum of all weights first (number of features may differ because of the pitch extraction)
    
    #fixed window durations
    attention_window = windows_dict['attention']
    memory_window = windows_dict['memory']

    n_samp_m = int(memory_window*round(sr_features))
    n_samp_a = int(attention_window*round(sr_features))

    for i in range(0,len(all_features_10Hz), shift):
        slopes_weighted = []
        #get start and end points for attentional window
        start_a = i
        end_a = n_samp_a + i
        x_a = list(range(0,n_samp_a))

        #End: attentional window is shorter
        if (len(all_features_10Hz) - end_a) < 0:
            end_a = len(all_features_10Hz) 
            x_a = range(0,(end_a - start_a))

        #and for the memory window (if there is one)
        if memory_window > 0:
            end_m = start_a - 1
            start_m = end_m - n_samp_m +1
            if start_m < 0:
                start_m = 0

            curr_mem = end_m - start_m +1

            x_m = list(range(0,curr_mem))

            if i > 2: # need at least 2 points for memory window
                memoryWindowActive = True
            else:
                memoryWindowActive = False

        #get the current attentional window
        curr_win = all_features_10Hz.iloc[start_a:end_a]
        
        for f in all_features_10Hz.columns[1:-1]:
            weight_curr = weights[f]/scale_weights
            
            if len(curr_win) > 1:
                slope = np.polyfit(x_a, curr_win[f], 1)
                slope = slope[0]
                slope_weighted = slope*weight_curr
                feature.append(f)
                slopes_weighted.append(slope_weighted)

        if memory_window > 0 and memoryWindowActive == True:
            curr_win_m = prediction[start_m:end_m+1]
            slope_m = np.polyfit(x_m,curr_win_m,1)
            prevSlope = slope_m[0]

            slopes_m.append(prevSlope)
        
        overall_slope = sum(slopes_weighted)

        epsilon = .0001;
        decay = .001;

        if (memory_window > 0) and memoryWindowActive:
        # If there is no change in attentional slope (practically
        # speaking) following no change in the memory window, add a
        # decrease the slope of the attentional window slightly.
            if (overall_slope < epsilon) and (overall_slope > -epsilon) and (prevSlope < epsilon) and (prevSlope > -epsilon):
                overall_slope = overall_slope - decay;
            # if both attentional and memory windows are in the same
            # direction, negative or positive, strengthen the attentional
            # window slope in the current direction
            elif ((overall_slope > 0) and (prevSlope > 0)) or ((overall_slope < 0) and (prevSlope < 0)):
                # 5 = recommendation
                overall_slope = overall_slope * 5


        cur_slope = overall_slope*np.array(x_a)

        if start_a < (len(all_features_10Hz)-1):
            if start_a == 0:
                prediction = cur_slope

            else:
                start = prediction[0:start_a]
                startval = prediction[start_a]
                middle = np.array(cur_slope[0:(len(prediction)-(start_a))]+prediction[(start_a):])/2

                if middle.size!=0:
                    offset1 = startval - middle[0]
                    middle = middle + offset1

                endChunk = cur_slope[len(middle)+1:]

                if middle.size!=0 & endChunk.size!=0:
                    offset2 = middle[-1] - endChunk[0]
                    endChunk = endChunk + offset2

                if endChunk.size!=0:
                    prediction = list(start) + list(middle) + list(endChunk)
                else:
                    prediction = list(start) + list(middle)

    
    prediction = stats.zscore(prediction)

    df_slopes = pd.DataFrame(zip(all_features_10Hz['time'], prediction), columns = ['time', 'prediction'])
    
    return df_slopes