In [1]:
import partitura
import numpy as np
import itertools
import os
import matplotlib.pyplot as plt
from basismixer.performance_codec import PerformanceCodec, get_performance_codec, to_matched_score
from basismixer.performance_codec import tempo_by_average, tempo_by_derivative 
from tensorly.decomposition import robust_pca
import plotly.express as px
import metric_learn
from sklearn.manifold import TSNE




In [2]:
# Define the Path of the match aligments
MATCHSCORE_DIR = os.path.dirname(os.path.dirname(os.getcwd())) + "\\alignments\\match_4\\"

file_structure = {
    "name": "Mozart",
    "structure": {
        "a1_1": (1, 5),
        "a2_1": (5, 9),
        "a1_2": (9, 13),
        "a2_2": (13, 17),
        "a1_3": (17, 21),
        "a2_3": (21, 27),
        "a1_4": (27, 31),
        "a2_4": (31, 36)
    },
    "step": 1,
    "measure": 6
}

In [3]:
def standard_analysis(
        attr, stat, note_array, performance_array, duration, forward_step_lim, 
        step, NUMBER_OF_WINDOWS=1, number_of_stats=1, is_BM=False
        ):
    '''
    A generalized definition for match score analysis
    
    Parameters:
    -----------
    attr : function
        One of the above functions, i.e. expressiveness_of_segment or tempo_of_segment.
    stat : function
        A statistical measure, i.e. mean, std or variance. 
    note_array : structed array
        The note_array from the partitura analysis
    performance_array : structured array
        The note_array from the basis mixer analysis
    duration : float
        The duration of a piece in seconds
    forward_spet_lim : int
        The maximum spread of the note array indices we can search for every window
    step : float
        The step for the analysis window.
    NUMBER_OF_WINDOWS : int 
        How many windows per step (equal to the length of the attr output vector).
    number_of_stats : itn
    
    is_BM : bool
    
    Returns:
    --------
    X : array
        An array of size len(duration of analysis) N x NUMBER_OF_WINDOWS x NUMBER_OF_WINDOWS.
    '''
    # normalize duration
    duration = duration / step
    step_unit = 1
    dim = int(round((duration - (NUMBER_OF_WINDOWS * step_unit)) / step_unit) + 1)
    # Experimenting with array resolution
    if NUMBER_OF_WINDOWS == 1:
        X = np.zeros((dim, number_of_stats))
    else :
        X = np.zeros((dim, NUMBER_OF_WINDOWS, number_of_stats))
    index = 0
    for i in range(1, dim - 1, step_unit):
        fix_start = i * step
        if NUMBER_OF_WINDOWS == 1:
            x = list()
            ind_list = list()
            if len(note_array[index:]) > forward_step_lim:
                look_in = forward_step_lim + index
            else :
                look_in = len(note_array)
            for ind, note in enumerate(note_array[index:look_in]):
                note_start = note[0] # onset
                note_end = note[0] + note[1] #onset + duration
                fix_end = fix_start + step
                # check if note is in window
                if (fix_start <= note_start <= fix_end) or (note_start <= fix_start and note_end >= fix_start):
                    ind_list.append(ind)
                    if is_BM:
                        x.append(performance_array[ind]) # Expressive Parameters
                    else:    
                        x.append(note)
            if x != []:
                X[i - 1] = attr(x, stat)
        else:
            for j in range(1, NUMBER_OF_WINDOWS + 1):
                x = list()
                ind_list = list()
                if len(note_array[index:]) > forward_step_lim*j:
                    look_in = forward_step_lim * j + index
                else:
                    look_in = len(note_array)
                for ind, note in enumerate(note_array[index:look_in]):
                    note_start = note[0] #onset
                    note_end = note[0] + note[1] #onset + duration
                    fix_end = fix_start + (j * step)
                    # check if note is in window
                    if (fix_start <= note_start <= fix_end) or (note_start <= fix_start and note_end >= fix_start):
                        ind_list.append(ind)
                        if is_BM:
                            x.append(performance_array[ind]) # Expressive Parameters
                        else:    
                            x.append(note)
                if x != []:
                    X[i - 1][j - 1] = attr(x, stat)
        if ind_list != []:
            index += min(ind_list)
    return X

def tempo_of_segment(x, stat):
    """
    Tempo feature extraction per windows.
    
    Parameters:
    -----------
    x : list(tuples)
        A segment of the note_array
    stat : function
        a statistical function that outputs
    
    Returns:
    --------
    stat(y1), stat(y2) : tuple(float)
        The statistics of Vectors y1 and y2
    
    """   
    
    score_onsets, score_durations, _, performed_onsets, performed_durations, _ = list(zip(*x))   
    y1 = tempo_by_average(score_onsets, performed_onsets, score_durations, performed_durations)[0]
    y2 = tempo_by_derivative(score_onsets, performed_onsets, score_durations, performed_durations)[0]
    return [stat(y1), stat(y2)]

def expressiveness_of_segment(x, stat):
    """
    Tempo feature extraction per windows.
    
    Parameters:
    -----------
    x : list(tuples)
        A segment of the note_array
    stat : function
        a statistical function that outputs
    
    Returns:
    --------
    stat(beat_period), stat(velocity), stat(timing) : tuple(float)
        The statistics of the expressive vectors
    
    """   
    
    beat_period, velocity, timing, articulation_log = list(zip(*x))   
    return [stat(beat_period), stat(velocity), stat(timing)]




def perform_analysis(file, attr, stat, number_of_windows):
    """Perform the analysis.
    
    Parameters
    ----------
    file : str
        the file name + extension.
    attr : function
        One of the above functions, i.e. expressiveness_of_segment or tempo_of_segment.
    stat : function
        A statistical measure, i.e. mean, std or variance. 
    number_of_windows : 
        How many windows per step (equal to the length of the attr output vector).
        
    Returns
    -------
    X : np.array
        An array of size len(duration of analysis) N x number_of_windows x number_of_windows.
    performance_SSM : np.array
        The SSM of X using dot product and robust PCA.
    """
    

    step = file_structure["step"]
    match_fn = MATCHSCORE_DIR + file
    ppart, alignment, spart = partitura.load_match(match_fn, create_part=True)
    note_array, _ = to_matched_score(spart, ppart, alignment)
    parameter_names = ['beat_period', 'velocity', 'timing', 'articulation_log']
    pc = get_performance_codec(parameter_names)
    performance_array, _ = pc.encode(spart, ppart, alignment)
    durations = [n[1] for n in note_array if n[1]!=0]
    min_duration = min(durations)
    max_duration = max(durations)
    max_polyphony = max([len(list(item[1])) for item in itertools.groupby(note_array, key=lambda x: x[0])])
    forward_step_lim = int(max_duration / min_duration + max_polyphony)
    note_array, performance_array = zip(*sorted(zip(note_array, performance_array), key=lambda note: note[0][0]))
    duration = note_array[-1][0] + max_duration - step
    
    if attr[1] == "tempo":
        is_BM = False
        dummy = note_array[-1]
    else:
        is_BM = True
        dummy = performance_array[-1]
    
    attr = attr[0]
    stat = stat[0]
    
    number_of_stats = len(attr([dummy], stat))
    X = standard_analysis(attr, stat, note_array, performance_array, duration, forward_step_lim, step, number_of_windows, number_of_stats, is_BM)
    low_rank_part, sparse_part = robust_pca(X, reg_E=0.04, learning_rate=1.2, n_iter_max=20)
    if len(low_rank_part.shape) > 2:
        performance_SSM = np.tensordot(low_rank_part, low_rank_part.T)
    else:
        performance_SSM = np.dot(low_rank_part, low_rank_part.T)

    return X, performance_SSM


def plot_tsne(X, y, z= None, colormap=plt.cm.Paired):
    """
    Plot the the TSNE of X.
    
    Parameters
    ----------
    X : np.array
        Some multidimentional data in an array (N, M)
    y : np.array
        Labels that are translated to colors
    z : np.array
        Labels that are translated as positions to the z-axis
    """
    
    tsne = TSNE()
    X_embedded = tsne.fit_transform(X)

    if z == None:
        fig = px.scatter(x=X_embedded[:, 0], y=X_embedded[:, 1], color=y)
    else: 
        fig = px.scatter_3d(x=X_embedded[:, 0], y=X_embedded[:, 1], z=z, color=y)
    fig.show()


In [4]:
# Declare the attributes and statistics
attributes = [(lambda x, stat: tempo_of_segment(x, stat), "tempo"),
              (lambda x, stat: expressiveness_of_segment(x, stat), "expressive")
             ]
statistic_methods = [(lambda y : np.mean(y), "mean"),
                     (lambda y : np.std(y), "std"),
                     (lambda y : np.var(y), "var")
                    ]

# Set the number of windows
# tempo takes 2, expressiveness 3 and mixed takes 6
number_of_windows = 3

data = list()
labels = list()
for file in os.listdir(MATCHSCORE_DIR):
    if file.startswith("Mozart") and file.endswith(".match"):
        X, performance_SSM = perform_analysis(file, attributes[1], statistic_methods[1], number_of_windows)
        data.append(X.reshape(X.shape[0], X.shape[1]*X.shape[2]))
        labels.append(os.path.splitext(file)[0][-3:])
        

In [5]:


X = list()
y = list()
z = list()
k = 0
for perf in data:
    for section_name, section_coord in file_structure["structure"].items():
        step = file_structure["step"]
        temp = perf[int((section_coord[0]-1)*6/step) : int((section_coord[1]-1)*6/step)].tolist()
        X.append(temp)
        y.append(labels[k])
        z.append(section_name)
    k +=1

# Uniformize all windows
# the last is the number of attribute vector * the number of windows

X_new = np.zeros([len(X),len(max(X, key = lambda x: len(x))), number_of_windows**2]) 
for i, j in enumerate(X):
    X_new[i][0:len(j)] = j
    
X = X_new.reshape(X_new.shape[0], X_new.shape[1]*X_new.shape[2])

# Quantifying Similarity within subsections of Performances

Our objective is to find a way to quantify similarity of performances for which we have a matched score.

First we have to define similarity in this context :

#### Performance Similarity :
We define two groups of performance Similarity. 
- One group consists of Auto-Similarity or similarity between sections of one performance. 
- The second group consists of similarities between different performances.



### MATCH: Music Alignment Tool CHest

A match file format is a toolkit for aligning audio recordings of different renditions of the same piece of music.

Which means we can relate some aspects of a performance, i.e. local tempo, dynamics, articulation, etc., with the score notation of the performed piece.


## Reasons to do this 

There are several things that motivate this quest for performance similarity: 

- Generate more accurate performances.
- See if repetitions of sections are performed similarly.
- Investigate if musical structure (sections, melody, etc.) can be automatically captured by performance aspects. 


    The merits of a succesfull capture of performance similarity are only up to our imagination of possible usecases.


## What questions are we trying to answer ?

- What elements affect our perception of performance similarity ?
- What makes performances similar ?
- How can we measure performance similarity ?

## What was our motivation ?

Our motivation is the below graph, based on an analysis model I created with the help of the partitura and basis mixer libraries. We analyzed a segment of Mozart's K.331 Piano Sonata performed by 22 different pianists.


In the graph below we see an un-supervised grouping of the analysis based on expressive attributes, such as local tempo, velocity, timing, and articulation. Color represents different perfomers and the z axis represents the subsections sections of the musical segment:


In [8]:
from sklearn.decomposition import PCA

cov = metric_learn.Covariance()
X_cov = cov.fit_transform(X)

pca = PCA(whiten=True)
X_pca = pca.fit_transform(X_cov)

tsne = TSNE()
X_embedded = tsne.fit_transform(X_pca)
fig = px.scatter_3d(x=X_embedded[:, 0], y=X_embedded[:, 1], z=z, color=y)
fig.show()

In [10]:
import plotly.graph_objects as go
import plotly.io as pio

fig = go.Figure(px.scatter_3d(x=X_embedded[:, 0], y=X_embedded[:, 1], z=z, color=y))
fig.update_layout(title_text='Covariance_Sampling')
                
pio.write_html(fig, file='covariance_sampling.html', auto_open=True)

### The survey

The survey we created is intented to be a guideline for continuing experiments and improving the analysis algorithm.
If our method (current or future) agrees with the survey results then we can advance to a full scale cognitive auditory experiment.

