In [1]:

%load_ext autoreload

%autoreload 2

In [2]:
import os
import sys

import json
from collections import Counter
import csv
#from compmusic import dunya
import music21
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
#dunya.set_token("52fc6ac49c0b7fc9644404aaf4f9bc1a7088d69d")


sys.path.append('../src/')
import patterns_per_nawba as pn
import nawba_centones
import classification as cl
data_path = '../data'

In [3]:
# nawba mappings to centones
nawba_centones_lookup = nawba_centones.load_and_parse_centones_mapping(os.path.join(data_path, 'centones_nawba.csv'))

# list of scores to parse
andalusian_description =  pd.read_json(os.path.join(data_path, 'andalusian_description.json'))

# mbid mappings to nawbas
nawba_tabs = nawba_centones.load_and_parse_nawba_tabs(os.path.join(data_path, 'nawba_tabs.json'))
mbid_tab_lookup = pn.mbids_per_tab(andalusian_description)
mbid_nawba_lookup = pn.mbids_per_nawba(mbid_tab_lookup, nawba_tabs)

 # score sections lookup   
score_annotations_lookup = pn.score_annotations_lookup()

In [4]:
istihlal_mbids = [mbid for mbid in mbid_nawba_lookup if mbid_nawba_lookup[mbid] == '5']

In [5]:
def pattern_stream_from_score_section(path, offsets, rest_quarter_length=0):
    """
    Load a score from <path> and return an ordered list of notes
    R represents a rest greater than or equal to <rest_quarter_length>
    ...rests shorter than <rest_quarter_length> are ignored

    Fails if score contains chords
    """

    s = music21.converter.parse(path)
    p = s.parts[0]
    section = p.getElementsByOffset(float(offsets[0]), float(offsets[1]),
                                    mustBeginInSpan=False,
                                    includeElementsThatEndAtStart=False).stream()
    # These are all the notes of the whole piece, fails for chords
    notes_and_rests = section.flat.notesAndRests.stream()
    notes = []
    for n in notes_and_rests:
        if n.isRest:
            notes.append('R')
        else:
            notes.append(n.name)
    return notes


In [6]:
def get_notes_from_score_section(scores_path, mbid_nawba_lookup):
    """
    Function that return a dict with mbid: all notes of a score
    :param scores_path: path where the scores are saved
    :return: described dict
    """
    notes_dict = {}
    chord_mbid = []
    for root, dirs, files in os.walk(scores_path):
        for file in files:
            mbid = file.replace('.xml','')
            if mbid in istihlal_mbids:
                if mbid in score_annotations_lookup:
                    notes_dict[mbid] = {}
                    for sections in score_annotations_lookup[mbid]:
                        for offsets in score_annotations_lookup[mbid][sections]:
                            # Fails for scores with chords
                            try:
                                note_stream = pattern_stream_from_score_section(os.path.join(scores_path,file), offsets)
                            except Exception as e:
                                print('{} contains chords and wont be counted'.format(mbid))
                                chord_mbid.append(mbid)
                            if sections not in notes_dict[mbid]:
                                notes_dict[mbid][sections] = [note_stream]
                            else:
                                notes_dict[mbid][sections].append(note_stream)
    return notes_dict

In [7]:
scores_path = '../data/scores_xml/'
notes_dict = get_notes_from_score_section(scores_path, mbid_nawba_lookup)

In [8]:
def extract_pattern_grams(notes, min_n=2, max_n=2):
    """
    For a list of list of notes, <notes>
    Extract all possible note-grams up to a maximum length of <n>
    Converts stream of notes to bag-of-patterns
    """
    num_notes = len(notes)
    comb  = []
    for i in range(num_notes):
        # Final n patterns are counted more than once
        n_ = num_notes - i if max_n > num_notes - i else max_n
        comb.append([notes[i:i+j] for j in range(2,n_+1)])
    flat = [i for c in comb for i in c]
    return ' '.join([''.join(x) for x in flat if len(x) >= min_n if 'R' not in x])

In [9]:
sections_patterns = []
for mbid in notes_dict:
    for sections in notes_dict[mbid]:
        for section in notes_dict[mbid][sections]:
            sections_patterns.append([sections, extract_pattern_grams(section, min_n=3, max_n=10)])

In [10]:
from collections import Counter
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
import random
import seaborn as sn
from sklearn import preprocessing

def get_tfidf_distributions(all_recordings):
    """
    For a list of all recordings patterns tf-df-results
    Returns:
        list, each elecment a recording, summarised as a list of (pattern, tf-idf, total_count_of_pattern)
    """
    token_pattern = '[A-a0-9#-]*'
    vectorizer = TfidfVectorizer(lowercase=False, sublinear_tf=True, token_pattern=token_pattern)
    X = vectorizer.fit_transform(all_recordings)
    top_patterns = []
    sorted_vocab = sorted([(k,v) for k,v in vectorizer.vocabulary_.items()], key=lambda y: y[1])
    vocab = [x[0] for x in sorted_vocab]
    for i,x in enumerate(X):
        weights = x.toarray()[0]
        pattern_counts = Counter(all_recordings[i].split(' '))
        this_patterns = []
        for v,w in zip(vocab, weights):
            tf = float(pattern_counts[v])/len(pattern_counts)
            this_patterns.append((v, tf*w, pattern_counts[v]))
        top_patterns.append(this_patterns)
    return top_patterns

In [11]:
only_patterns = [x[1] for x in sections_patterns]
distributions = get_tfidf_distributions(only_patterns)

In [12]:
def average_tfidf(distributions, indices):
    """
    For a list of tfidf results (from model.get_tfidf_distributions)
    Return:
        Dataframe of average tf-idf across recordings of the same nawba
        ...[nawba, pattern, tf-idf, frequency]
    """
    frame = zip_nawba(distributions, indices)
    frame_grouped = frame.groupby(['index', 'pattern'])\
                         .agg({'tf-idf': 'mean', 'frequency': 'sum'})\
                         .reset_index()
    return frame_grouped


def zip_nawba(distributions, indices):
    """
    Convert distributions output to DF
    """
    zip_nawba = [
        [(n,x,y,z) for x,y,z in d] \
        for n,d in zip(indices, distributions)
    ]
    frame = pd.DataFrame(
        [y for x in zip_nawba for y in x],
        columns=['index', 'pattern', 'tf-idf', 'frequency']
    )
    return frame

In [13]:
frame_grouped = average_tfidf(distributions, [x[0] for x in sections_patterns])

In [14]:
frame_grouped

Unnamed: 0,index,pattern,tf-idf,frequency
0,inṣirāf,,0.000000,0
1,inṣirāf,AAA,0.000918,261
2,inṣirāf,AAAA,0.000157,49
3,inṣirāf,AAAAA,0.000005,6
4,inṣirāf,AAAAAG,0.000000,0
5,inṣirāf,AAAAAGE,0.000000,0
6,inṣirāf,AAAAAGEE,0.000000,0
7,inṣirāf,AAAAAGEEF,0.000000,0
8,inṣirāf,AAAAAGEEFD,0.000000,0
9,inṣirāf,AAAAB-,0.000000,0


### Classification

In [15]:
corresponding_sections = [x[0] for x in sections_patterns]

In [16]:
df = zip_nawba(distributions, corresponding_sections)

In [17]:
list_of_sections = mylist = list(dict.fromkeys(corresponding_sections))

In [18]:
section_patterns={}
for section in list_of_sections:
    cont = 0
    this_frame = frame_grouped[(frame_grouped['index'] == section)].sort_values(by='tf-idf', ascending=False)
    likely_patterns = this_frame[['pattern','tf-idf']]
    likely_patterns = likely_patterns.values.tolist()
    if section not in section_patterns:
        section_patterns[section]=[]
    for pattern in likely_patterns[:10]:
        section_patterns[section].append(pattern)

In [19]:
section_patterns

{'mišālia': [['FED', 0.009530580064484246],
  ['EFEF', 0.00829734543768562],
  ['EFE', 0.007039243271887708],
  ['FEF', 0.006986637522275103],
  ['EDC', 0.006164799488139136],
  ['EDCCC', 0.004882118272167268],
  ['FFE', 0.004676779908171378],
  ['EDDD', 0.004618212917938627],
  ['EFEFEF', 0.004618212917938627],
  ['EFEFEFG', 0.004618212917938627]],
 'tawāšī': [['GFE', 0.0020476265045236215],
  ['FED', 0.0019639628936384567],
  ['EDC', 0.0019002177842133712],
  ['GFED', 0.001673350500404252],
  ['CDE', 0.0014946541002338859],
  ['FEDC', 0.0013957618269212877],
  ['FGF', 0.0013104573950982574],
  ['GFEDC', 0.00109094090639226],
  ['EFG', 0.0010308892241602035],
  ['DEF', 0.0010277736580738626]],
 'muassa‘': [['GFE', 0.003878961510190407],
  ['FED', 0.0034924822460849422],
  ['EDC', 0.0031487058541129656],
  ['GFED', 0.002814542950432407],
  ['FEDC', 0.002693035864410414],
  ['AGF', 0.0021606590941116214],
  ['AGFE', 0.002150209905477527],
  ['EFG', 0.0019748752584738223],
  ['GFEDC', 0.

In [27]:
def train_classifier(df, nawba_groups):
    patterns = [i[0] for s in nawba_groups.values() for i in s]
            
    frame = df[df['pattern'].isin(patterns)]
    data = frame.pivot_table(values='frequency', columns='pattern', index='index')\
                 .reset_index()
    #mbid_nawba_dict = {x:y for x,y in mbid_nawba}
    #data['index'] = data['index'].apply(lambda y: mbid_nawba_dict[y])
        
    train, test = train_test_split(data, test_size=0.6)
    
    y = train['index']
    X = train[[x for x in patterns if x in train.columns]]
    
    clf = LogisticRegression(
        random_state=42,
        C=0.01,
        solver='liblinear',
        multi_class='ovr'
    ).fit(X, y)
    
    test_preds = clf.predict(test[[x for x in patterns if x in test.columns]])
    y_true = test['index']
    
    #plt.figure(figsize=(10,10))
    #sn.heatmap(confusion_matrix(test_preds, y_true))
    
    return accuracy_score(test_preds, y_true)

In [31]:
our_results = [train_classifier(df, section_patterns) for x in range(100)]

In [32]:
print('Results: {} ({})'.format(round(np.mean(our_results)*100, 2), round(np.std(our_results)*100,2)))

Results: 0.0 (0.0)
