## Setup corpus for [Patterns Knowledge Graph](https://github.com/polifonia-project/patterns-knowledge-graph) (KG) ingest.

In [1]:
# imports

import numpy as np
import pandas as pd
import pickle

In this notebook we apply KG preprocessing to the MTC-ANN sample corpus included in the FoNN repo.
To ingest another corpus, please first apply FoNN's feature extraction and pattern extraction tools via ```./notebooks/feature_extraction_tools_demo.ipynb``` and ```/notebooks/pattern_extraction_demo.ipynb```.

After the corpus has been ingested through the FoNN pipeline, point the 'pattern_corpus_dir' path to ```/[corpus name]/pattern_corpus/```.
Set the 'level' variable to the subdirectory corresponding to the corpus data under investigation-- this can be either 'note', 'accent' or 'duration_weighted'.

In [None]:
# Load 'pattern corpus' pattern occurrences matrix
pattern_corpus_dir = '../mtc_ann_corpus/pattern_corpus/'
level = 'duration_weighted'
in_file = '6gram_locations.pkl'
in_path = pattern_corpus_dir + level + '/' + in_file
pattern_occurrences = pd.read_pickle(in_path)
# check size
print(len(pattern_occurrences))


In [3]:
# filter pattern occurrences matrix, retaining patterns which occur twice or more
print(pattern_occurrences)
freq_table = freq_table.astype('Sparse[int16, 0]')
freq_table.fillna(value=0, inplace=True)
freq_table['freq'] = freq_table.sum(axis=1)
freq_table = freq_table[freq_table['freq'] >= 2]
print(freq_table.head())
# check size
print(len(freq_table))



                    NLB072355_01  NLB072255_01  NLB076303_01  NLB073150_01  \
patterns                                                                     
[1, 1, 1, 1, 1, 1]           NaN           NaN           1.0           2.0   
[1, 1, 1, 1, 1, 2]           NaN           NaN           NaN           1.0   
[1, 1, 1, 1, 1, 3]           NaN           NaN           1.0           NaN   
[1, 1, 1, 1, 1, 4]           NaN           NaN           NaN           NaN   
[1, 1, 1, 1, 1, 5]           NaN           NaN           NaN           NaN   
...                          ...           ...           ...           ...   
[7, 7, 7, 7, 7, 3]           NaN           NaN           NaN           NaN   
[7, 7, 7, 7, 7, 4]           NaN           NaN           NaN           NaN   
[7, 7, 7, 7, 7, 5]           NaN           NaN           NaN           NaN   
[7, 7, 7, 7, 7, 6]           NaN           NaN           NaN           NaN   
[7, 7, 7, 7, 7, 7]           NaN           NaN           NaN    

In [4]:
# extract filtered dataframe index; reformat index vals (patterns) representation from np arrays to tuples
indices_reformatted = [tuple(i) for i in freq_table.index]
print(len(indices_reformatted))
test_subset = indices_reformatted[:3]
print(test_subset)
print(type(test_subset[0][0]))
filtered_patterns = set(indices_reformatted)


3088
[(1, 1, 1, 1, 1, 1), (1, 1, 1, 1, 1, 2), (1, 1, 1, 1, 1, 3)]
<class 'numpy.int16'>


In [5]:
# read locations dict
locations_dir = '/Users/dannydiamond/NUIG/Polifonia/MTC/MTC-ANN-2.0/mtc_ann_feat_seq_corpus/locations'
locations_filename = 'dw_locations6.pkl'
locations_in_path = locations_dir + '/' + locations_filename
with open(locations_in_path, 'rb') as locations_raw:
    locations = pickle.load(locations_raw)
    print(len(locations))

360


In [6]:
from string import digits
# extract tune titles:
# titles  = [tune.rstrip(digits) for tune in locations]
titles  = [tune for tune in locations]
print(titles[:10])
# extract tune id numbers:
variant_id_nums = [''.join(i for i in tune if i.isdigit()) for tune in locations]
print(variant_id_nums[:10])

id_dict = dict(zip(variant_id_nums, titles))


['NLB072355_01', 'NLB072255_01', 'NLB076303_01', 'NLB073150_01', 'NLB072567_01', 'NLB073296_01', 'NLB073269_02', 'NLB076211_01', 'NLB072359_01', 'NLB075739_03']
['07235501', '07225501', '07630301', '07315001', '07256701', '07329601', '07326902', '07621101', '07235901', '07573903']


In [7]:
id_dict_filenames = dict(zip(variant_id_nums, list(locations)))

# check output
for k, v in list(id_dict_filenames.items())[:10]:
    print(k, v)

07235501 NLB072355_01
07225501 NLB072255_01
07630301 NLB076303_01
07315001 NLB073150_01
07256701 NLB072567_01
07329601 NLB073296_01
07326902 NLB073269_02
07621101 NLB076211_01
07235901 NLB072359_01
07573903 NLB075739_03


In [8]:
# filter locations dict
# test_subset = dict(islice(locations.items(), 1000))

filtered_locations = {}
for tune in locations:
    # read value, which holds a dict of patterns and locations
    pattern_locations = locations[tune]
    # print(len(pattern_locations))
    # filter
    filtered = {pattern: pattern_locations[pattern] for pattern in pattern_locations if pattern in filtered_patterns}
    # print(len(filtered))
    filtered_locations[tune] = filtered

# check output
res = dict(zip(list(id_dict), list(filtered_locations.values())))
for k, v in list(res.items())[:10]:
    print(f"{k}:\n{v}")


07235501:
{(4, 4, 4, 5, 5, 5): [0], (4, 4, 5, 5, 5, 4): [1], (4, 5, 5, 5, 4, 3): [2], (5, 5, 5, 4, 3, 3): [3], (5, 5, 4, 3, 3, 1): [4], (5, 4, 3, 3, 1, 1): [5], (4, 3, 3, 1, 1, 1): [6], (3, 3, 1, 1, 1, 1): [7], (3, 1, 1, 1, 1, 3): [8], (1, 1, 1, 1, 3, 3): [9], (1, 1, 1, 3, 3, 2): [10], (1, 1, 3, 3, 2, 2): [11], (1, 3, 3, 2, 2, 3): [12], (3, 3, 2, 2, 3, 3): [13], (3, 2, 2, 3, 3, 3): [14], (2, 2, 3, 3, 3, 3): [15], (2, 3, 3, 3, 3, 2): [16], (3, 3, 3, 3, 2, 1): [17], (3, 3, 3, 2, 1, 1): [18], (3, 3, 2, 1, 1, 1): [19], (3, 2, 1, 1, 1, 2): [20], (2, 1, 1, 1, 2, 2): [21], (1, 1, 1, 2, 2, 2): [22], (1, 1, 2, 2, 2, 2): [23], (1, 2, 2, 2, 2, 5): [24], (2, 2, 2, 2, 5, 5): [25], (2, 2, 2, 5, 5, 5): [26], (2, 2, 5, 5, 5, 3): [27], (2, 5, 5, 5, 3, 2): [28], (5, 5, 5, 3, 2, 1): [29], (5, 5, 3, 2, 1, 2): [30], (5, 3, 2, 1, 2, 2): [31], (3, 2, 1, 2, 2, 3): [32], (2, 1, 2, 2, 3, 3): [33], (1, 2, 2, 3, 3, 2): [34], (2, 2, 3, 3, 2, 2): [35], (2, 3, 3, 2, 2, 2): [36], (3, 3, 2, 2, 2, 2): [37], (3, 2, 2, 2

In [9]:
# calculate tune lengths (beats and 1/8 notes):

import os

def calculate_tune_lengths(target_dir):
    tune_durations = []
    for file_name in os.listdir(target_dir):
        if file_name.endswith('.csv'):
            file_path = f"{target_dir}/{file_name}"
            tune_title = file_name[:-4]
            with open(file_path) as content:
                counter = len(content.readlines()) + 1
                tune_durations.append(counter)
    tune_durations = dict(zip(list(id_dict), tune_durations))

    return tune_durations

# counting tune length:
corpus_path = '/Users/dannydiamond/NUIG/Polifonia/MTC/MTC-ANN-2.0/mtc_ann_feat_seq_corpus/feat_seq_dw'
tune_lengths = calculate_tune_lengths(corpus_path)
# # counting (1/8) notes
# notes_path = '/Users/dannydiamond/NUIG/Polifonia/thesession/revised_feat_seq_corpus_no_pickups/duration_weighted'
# duration_eighth_notes = calculate_tune_lengths(notes_path)

In [10]:
# Combine dicts generated in above cell: title (key): dict (val -- 'duration_beats': num, 'duration_eighth_notes': num)

# durations = {}
# for tune in duration_beats:
#     durations[tune] = {'duration_beats': duration_beats[tune], 'duration_eighth_notes': duration_eighth_notes[tune]}
#
# for tune in duration_beats:
#     durations[tune] = duration_beats[tune]

In [11]:
import csv
# Extract pitch class sequences from csv files for all tunes:
feature = 'diatonic_scale_degree'
in_dir = corpus_path
filenames = [filename[:-4] for filename in os.listdir(in_dir)]
inpaths = [f"{in_dir}/{filename}" for filename in os.listdir(in_dir) if filename.endswith(".csv")]
# identify csv col names
with open(inpaths[0]) as testfile:
    csv_reader = csv.reader(testfile, delimiter=',')
    cols = next(csv_reader)
    colsmap = {col_name: i for i, col_name in enumerate(cols)}
    assert feature in colsmap
# identify index of target column
target_col_idx = colsmap[feature]
# extract target column from all tunes to list
raw_data = [
    np.genfromtxt(
        path,
        dtype=str,
        delimiter=',',
        usecols=target_col_idx,
            skip_header=1)
        for path in inpaths
    ]


In [12]:


feat_seq_data = []
for i in raw_data:
    numeric = []
    for j in i:
        num = int(float(j)) if j != '' else 0
        numeric.append(num)
    feat_seq_data.append(numeric)

feat_seq_data =dict(zip(variant_id_nums, feat_seq_data))

# # confirm ordering matches 'locations' variable above
print(filenames == list(locations))
# inspect sample output
# print(feat_seq_data[0].tolist())

# check output
for k, v in list(feat_seq_data.items())[:2]:
    print(k, v)

True
07235501 [4, 4, 4, 5, 5, 5, 4, 3, 3, 1, 1, 1, 1, 3, 3, 2, 2, 3, 3, 3, 3, 2, 1, 1, 1, 2, 2, 2, 2, 5, 5, 5, 3, 2, 1, 2, 2, 3, 3, 2, 2, 2, 2, 1, 1, 1]
07225501 [5, 1, 1, 3, 5, 5, 5, 6, 6, 6, 5, 5, 5, 1, 2, 3, 3, 3, 2, 1, 3, 1, 5, 5, 5, 1, 2, 2, 2, 2, 2, 3, 5, 5, 5, 3, 3, 1, 5, 5, 5, 4, 4]


In [13]:
# # add tune family annotation -- step 1: read and format tune family names:
# tune_fams_dir = '/Users/dannydiamond/NUIG/Polifonia/thesession/test_corpus/tune_fams_ground_truth'
# tune_family_names = [filename for filename in os.listdir(tune_fams_dir) if filename.endswith('.csv')]
#
# tune_family_names_formatted = [
#     "Jenny's Welcome to Charlie",
#     "Drowsy Maggie",
#     "Gilderoy",
#     "Greig's Pipes",
#     "Blackbird",
#     "Hob or Nob",
#     "O'Sullivan's March",
#     "Road to Lisdoonvarna",
#     "Lord McDonald's",
#     "Johnny Cope"
# ]
#
# fam_names = dict(zip(tune_family_names_formatted, tune_family_names))
# print(fam_names)


In [14]:
# # add tune family annotation -- step 2: compile all tune variants in each family
#
# # base path to The Session ground truth dir:
# grd_truth_basepath = '/Users/dannydiamond/NUIG/Polifonia/thesession/test_corpus/tune_fams_ground_truth/'
# grd_truth_paths  = {}
# for formatted, raw in fam_names.items():
#     grd_truth_path = grd_truth_basepath + raw # 'tune_family_name' variable is defined in above code cell.
#     assert os.path.isfile(grd_truth_path)
#     # print(f"Ground truth path for {formatted}:")
#     # print(grd_truth_path)
#     # fix
#     grd_truth_paths[formatted] = grd_truth_path
#
# print(grd_truth_paths)
#
#
# def format_input_data(in_path):
#     """For a given tune family csv file located at 'in_path', this function extracts and lists all tune titles."""
#     if 'precision' not in in_path:
#         data = pd.read_csv(in_path)
#         return data['title'].tolist()
#
# # Execute above function and print sample output:
# grd_truth = {}
# tot = 0
# for fam in grd_truth_paths:
#     grd_truth[fam] = format_input_data(grd_truth_paths[fam])
#     check = len(grd_truth[fam])
#     print(f"Ground truth data for {fam}:")
#     for title in grd_truth[fam]:
#         print(title)
#     print(check)
#     tot += check
#     print('\n')
# print(tot)


In [15]:
# # add tune family annotation -- step 3: add annotation for all tunes in corpus
# tune_fam_annotation_numeric = {}
# for tune in list(locations):
#     tune_fam_annotation_numeric[''.join(i for i in tune if i.isdigit())] = ''
#     for fam in grd_truth:
#         if tune in grd_truth[fam]:
#             # print(f"{tune} in {fam}")
#             tune_fam_annotation_numeric[''.join(i for i in tune if i.isdigit())] = fam
#
# print({i for i in tune_fam_annotation_numeric if tune_fam_annotation_numeric[i]=="Jenny's Welcome to Charlie"})
# print(len(tune_fam_annotation_numeric))


In [16]:
# Setup namedtuple:
from typing import NamedTuple
KG_Data = NamedTuple('KG_Data', [
    ('identifiers', str),   #  previously 'id_number'
    ('title', str),
    # removed ('tune_family', str),
    ('feature', str),
    ('level', str),
    ('n_vals', tuple),
    ('duration', int),  # previously 'duration_beats
    ('pattern_locations', dict),    # previously 'locations'
    ('data', list)  # previously 'feature_sequence_data'
])

for tune_id_num in list(id_dict)[:10]:
    print(tune_id_num)
    print(id_dict[tune_id_num])
    # print(tune_fam_annotation_numeric[tune_id_num])
    print(feat_seq_data[tune_id_num])
    print(tune_lengths[tune_id_num])
    print(res[tune_id_num])
    print('\n')







07235501
NLB072355_01
[4, 4, 4, 5, 5, 5, 4, 3, 3, 1, 1, 1, 1, 3, 3, 2, 2, 3, 3, 3, 3, 2, 1, 1, 1, 2, 2, 2, 2, 5, 5, 5, 3, 2, 1, 2, 2, 3, 3, 2, 2, 2, 2, 1, 1, 1]
48
{(4, 4, 4, 5, 5, 5): [0], (4, 4, 5, 5, 5, 4): [1], (4, 5, 5, 5, 4, 3): [2], (5, 5, 5, 4, 3, 3): [3], (5, 5, 4, 3, 3, 1): [4], (5, 4, 3, 3, 1, 1): [5], (4, 3, 3, 1, 1, 1): [6], (3, 3, 1, 1, 1, 1): [7], (3, 1, 1, 1, 1, 3): [8], (1, 1, 1, 1, 3, 3): [9], (1, 1, 1, 3, 3, 2): [10], (1, 1, 3, 3, 2, 2): [11], (1, 3, 3, 2, 2, 3): [12], (3, 3, 2, 2, 3, 3): [13], (3, 2, 2, 3, 3, 3): [14], (2, 2, 3, 3, 3, 3): [15], (2, 3, 3, 3, 3, 2): [16], (3, 3, 3, 3, 2, 1): [17], (3, 3, 3, 2, 1, 1): [18], (3, 3, 2, 1, 1, 1): [19], (3, 2, 1, 1, 1, 2): [20], (2, 1, 1, 1, 2, 2): [21], (1, 1, 1, 2, 2, 2): [22], (1, 1, 2, 2, 2, 2): [23], (1, 2, 2, 2, 2, 5): [24], (2, 2, 2, 2, 5, 5): [25], (2, 2, 2, 5, 5, 5): [26], (2, 2, 5, 5, 5, 3): [27], (2, 5, 5, 5, 3, 2): [28], (5, 5, 5, 3, 2, 1): [29], (5, 5, 3, 2, 1, 2): [30], (5, 3, 2, 1, 2, 2): [31], (3, 2, 1, 2, 

In [17]:
# populate output class instances

output = []
# for tune_id_num in id_dict:
#     tune_data = KG_Data(id_number= tune_id_num, tune_title=id_dict[tune_id_num], tune_family=tune_fam_annotation_numeric[tune_id_num], feature='diatonic scale degree', level='accent', feature_sequence_data=feat_seq_data[tune_id_num], n_vals=(4, 5, 6), duration=duration_beats[tune_id_num], locations=res[tune_id_num])
#     output.append(tune_data)

for tune_id_num in id_dict:
    tune_data = KG_Data(identifiers= tune_id_num, title=id_dict[tune_id_num], feature='diatonic scale degree', level='(duration-weighted) note', data=feat_seq_data[tune_id_num], n_vals=(4, 5, 6), duration=tune_lengths[tune_id_num], pattern_locations=res[tune_id_num])
    output.append(tune_data)

print(output[0])
print(len(output))

KG_Data(identifiers='07235501', title='NLB072355_01', feature='diatonic scale degree', level='(duration-weighted) note', n_vals=(4, 5, 6), duration=48, pattern_locations={(4, 4, 4, 5, 5, 5): [0], (4, 4, 5, 5, 5, 4): [1], (4, 5, 5, 5, 4, 3): [2], (5, 5, 5, 4, 3, 3): [3], (5, 5, 4, 3, 3, 1): [4], (5, 4, 3, 3, 1, 1): [5], (4, 3, 3, 1, 1, 1): [6], (3, 3, 1, 1, 1, 1): [7], (3, 1, 1, 1, 1, 3): [8], (1, 1, 1, 1, 3, 3): [9], (1, 1, 1, 3, 3, 2): [10], (1, 1, 3, 3, 2, 2): [11], (1, 3, 3, 2, 2, 3): [12], (3, 3, 2, 2, 3, 3): [13], (3, 2, 2, 3, 3, 3): [14], (2, 2, 3, 3, 3, 3): [15], (2, 3, 3, 3, 3, 2): [16], (3, 3, 3, 3, 2, 1): [17], (3, 3, 3, 2, 1, 1): [18], (3, 3, 2, 1, 1, 1): [19], (3, 2, 1, 1, 1, 2): [20], (2, 1, 1, 1, 2, 2): [21], (1, 1, 1, 2, 2, 2): [22], (1, 1, 2, 2, 2, 2): [23], (1, 2, 2, 2, 2, 5): [24], (2, 2, 2, 2, 5, 5): [25], (2, 2, 2, 5, 5, 5): [26], (2, 2, 5, 5, 5, 3): [27], (2, 5, 5, 5, 3, 2): [28], (5, 5, 5, 3, 2, 1): [29], (5, 5, 3, 2, 1, 2): [30], (5, 3, 2, 1, 2, 2): [31], (3, 2, 

In [18]:
# # slice output & write to disc
# sample_output = output[:200]
# print(len(sample_output))
# print(sample_output[0])

# # write sample output
# sample_output_filename = 'thesession_kg_data.pkl'
# sample_output_path =  '/Users/dannydiamond/Desktop/' + sample_output_filename
# with open(sample_output_path, 'wb') as f_out:
#     pickle.dump(sample_output, f_out)

In [19]:
# write output
output_filename = 'mtc_kg_data_6.pkl'
locations_output_path = locations_dir + '/' + output_filename
with open(locations_output_path, 'wb') as f_out:
    pickle.dump(output, f_out)
