In [1]:
# reads in a midi file and it's corresponding performance, 
# partitions them into chcunks of given length

import json
import pandas as pd
import ast

ASAP="Datasets/asap-dataset/"
json_file=f"{ASAP}/asap_annotations.json"


# read in a json file
with open(json_file) as json_file:
    data = json.load(json_file)

data_entry=0
piece_name = list(data.keys())[data_entry]
print(piece_name)

def get_piece_info(piece_name):
    piece_info=data[piece_name]
    for key in piece_info.keys():
        piece_info[key] = str(piece_info[key])
    piece_info_df = pd.DataFrame(piece_info.items(), columns=['Key', 'Value'])
    return piece_info_df

piece_info=get_piece_info(piece_name)
piece_info

Bach/Fugue/bwv_846/Shi05M.mid


Unnamed: 0,Key,Value
0,performance_beats,"[1.095052, 2.364583, 3.66276, 4.924479, 6.2109..."
1,performance_downbeats,"[4.924479, 9.818359, 14.721355, 19.564453, 24...."
2,performance_beats_type,"{'1.095052': 'b', '2.364583': 'b', '3.66276': ..."
3,perf_time_signatures,"{'4.924479': ['4/4', 4]}"
4,perf_key_signatures,"{'1.095052': [0, 0]}"
5,midi_score_beats,"[0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, ..."
6,midi_score_downbeats,"[2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 1..."
7,midi_score_beats_type,"{'0.5': 'b', '1.0': 'b', '1.5': 'b', '2.0': 'd..."
8,midi_score_time_signatures,"{'2.0': ['4/4', 4]}"
9,midi_score_key_signatures,"{'0.5': [0, 0]}"


In [3]:
performance_beats= ast.literal_eval(piece_info.iloc[2,1])
score_beats= ast.literal_eval(piece_info.iloc[7,1])

# construct a pandas dataframe with columns "Beat Type", "Performance", "Score"
alignment_df = pd.DataFrame(columns=['Beat Type', 'Performance', 'Score'])

performance_beats=[(time,beat_type) for time,beat_type in performance_beats.items()]
score_beats=[(time,beat_type) for time,beat_type in score_beats.items()]

for i in range(len(performance_beats)):
    beat_type=performance_beats[i][1]
    performance_time=performance_beats[i][0]
    score_time=score_beats[i][0]
    alignment_df.loc[i] = [beat_type, performance_time, score_time]
alignment_df

Unnamed: 0,Beat Type,Performance,Score
0,b,1.095052,0.5
1,b,2.364583,1.0
2,b,3.66276,1.5
3,db,4.924479,2.0
4,b,6.210938,2.5
...,...,...,...
101,b,132.488281,51.0
102,b,134.296875,51.5
103,db,136.052083,52.0
104,b,138.016927,52.5


In [4]:
import pretty_midi as pm

midi_path=f"{ASAP}/{piece_name}"
midi_data = pm.PrettyMIDI(midi_path)
x = midi_data.instruments[0].get_piano_roll(fs=100) # shape=(pitch, timestep)

In [5]:

import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

from midi_processing import mid2dat_anna

# Stats for tokens in the dataset (performance, score) as a function of number of beats
token_lengths = []
num_performance_beats=[]
for piece_location in tqdm(data.keys()):
    piece_name=piece_location.split("/")[-1]
    file=f"{ASAP}{piece_location}"
    # Save the tokens in a folder
    with open(f"Store/{piece_name}_tokens.txt", "r") as f:
        tokens = f.readlines()
    tokens = [token.strip() for token in tokens]
    token_lengths.append(len(tokens))

    piece_info=get_piece_info(piece_location)

    performance_beats= ast.literal_eval(piece_info.iloc[2,1])

    num_performance_beats.append(len(performance_beats))


  0%|          | 0/1067 [00:00<?, ?it/s]


FileNotFoundError: [Errno 2] No such file or directory: 'Store/Shi05M.mid_tokens.txt'

In [None]:
plt.figure(figsize=(10,10))
sns.scatterplot(x=num_performance_beats, y=token_lengths)
plt.xlabel("Number of Performance Beats")
plt.ylabel("Number of Tokens")
plt.title("Token Length vs Number of Performance Beats")
# mark minimum and maximum tokens
plt.scatter(num_performance_beats[0], token_lengths[0], color="red")
# label minimum and maximum tokens
plt.annotate(f"{token_lengths[0]}", (num_performance_beats[0], token_lengths[0]))
plt.show()

# divide each token by the number of beats it corresponds to and find the average
token_lengths_per_beat = [token_lengths[i]/num_performance_beats[i] for i in range(len(token_lengths))]
# describe the distribution of token lengths per beat by converting it to a dataframe
token_lengths_per_beat_df = pd.DataFrame(token_lengths_per_beat, columns=['Token Length Per Beat'])
token_lengths_per_beat_df.describe()

In [None]:
'''
1) Take a start beat from the performance and find the end beat which maximises the number of performance tokens
2) Now we know the start beat and end beat, we can find the corresponding tokens in the score
'''
    
def section(performance_tokens, score_tokens, performance_beats,scores_beats, start_beat=0):
    start_beat=start_beat
    end_beat=get_end_beat(performance_tokens, start_beat, performance_beats)

    # For performance
    start_token=beat2TokenPosition(start_beat, performance_beats, performance_tokens)
    end_token=beat2TokenPosition(end_beat, performance_beats, performance_tokens)
    performance_section=performance_tokens[start_token:end_token]

    # For score
    start_token=beat2TokenPosition(start_beat, scores_beats, score_tokens)
    end_token=beat2TokenPosition(end_beat, scores_beats, score_tokens)
    score_section=score_tokens[start_token:end_token]

    return performance_section, score_section



def beat2TokenPosition(beat, beats, tokens):
    beat_time=beats[beat]

    time_shift_positions=[]

    time_elapsed=0
    # Iterate through performance_tokens, if the token is a time_shift token then add the time_shift to the time_elapsed
    for i in range(len(tokens)):
        token=tokens[i]
        if token.type=="TIME_SHIFT":
            time_elapsed+=token.value
            time_shift_positions.append((i, time_elapsed))
    
    # Iterate through time_shift_positions, if the time_elapsed is greater than the beat_time then return the index of the token
    
    for i in range(len(time_shift_positions)):
        if time_shift_positions[i][1]>beat_time:
            return time_shift_positions[i][0]
    

def get_end_beat(performance_tokens, start_beat, performance_beats,max_tokens=512):
    end_beat=start_beat
    
    end_beat_token_position=beat2TokenPosition(end_beat, performance_beats, performance_tokens)
    while end_beat_token_position<max_tokens:
        end_beat+=1
        end_beat_token_position=beat2TokenPosition(end_beat, performance_beats, performance_tokens)
    return end_beat






In [None]:
def get_score_file_name(performance_file_name):
    piece_name=performance_file_name.split("/")[-1]
    for key in data.keys():
        if key.split("/")[-1]==piece_name:
            score_file_name=key.replace(piece_name, "midi_score.mid")
            return score_file_name

performance_file="Bach/Fugue/bwv_846/Shi05M.mid"
start_beat=0


performance_info=get_piece_info(performance_file)

# get beats
performance_beats= ast.literal_eval(performance_info.iloc[2,1])
score_beats= ast.literal_eval(performance_info.iloc[7,1])

# get tokens
performance_tokens= 4

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import numpy as np
from midi_processing import mid2dat_anna

def get_time_shift_differences(tokens):
    prev_time_shift_index=0
    number_of_tokens_in_between_time_shifts=[]
    for i in range(1,len(tokens)):
        token=tokens[i]
        if "TIME_SHIFT" in token:
            number_of_tokens_in_between_time_shifts.append(i-prev_time_shift_index)
            prev_time_shift_index=i
    return number_of_tokens_in_between_time_shifts

        
# stats for difference in values of successive TIME_SHIFT tokens
time_shift_differences=[]
for piece_location in tqdm(data.keys()):
    piece_name=piece_location.split("/")[-1]
    file=f"{ASAP}{piece_location}"
    # Save the tokens in a folder
    with open(f"Store/{piece_name}_tokens.txt", "r") as f:
        tokens = f.readlines()
    tokens = [token.strip() for token in tokens]
    time_shift_differences.extend(get_time_shift_differences(tokens))



In [None]:
# Describe the distribution of time_shift_differences
time_shift_differences_df = pd.DataFrame(time_shift_differences, columns=['Time Shift Differences'])
time_shift_differences_df.describe()

In [None]:
import random
random.seed(420)
import pandas as pd
import pickle
from tqdm import tqdm

def beat2TokenPosition(beat, beats, tokens,time_shift_positions=[]):
    beat_time=beats[beat]

    if len(time_shift_positions)==0:
        time_shift_positions=getTimeShifts(tokens)

    for i in range(len(time_shift_positions)):
        if time_shift_positions[i][1]>beat_time:
            return time_shift_positions[i-1][0]

def getTimeShifts(tokens):
    time_shift_positions=[]
    time_elapsed=0
    for i in range(len(tokens)):
        token=tokens[i]
        if token.type=="TIME_SHIFT":
            time_elapsed+=token.value/1000
            time_shift_positions.append((i, time_elapsed))
    return time_shift_positions

def splitTokens(tokens,beats):
    split_tokens=[]
    
    time_shift_positions=getTimeShifts(tokens)

    # find tokens between consequtive beats
    for i in range(len(beats)-1):
        start_beat=i
        end_beat=i+1
        start_token=beat2TokenPosition(start_beat, beats, tokens,time_shift_positions)
        end_token=beat2TokenPosition(end_beat, beats, tokens,time_shift_positions)
        split_tokens.append(tokens[start_token:end_token])
    
    return(split_tokens)
    

def read_annotations(annotations_file):
        annotations=pd.read_json(annotations_file).transpose()
        annotations['score_filename'] = annotations.index.map(lambda x: f"{'/'.join(x.split('/')[:-1])}/midi_score.mid")
        annotations.index=annotations.index.rename('performance_filename')
        annotations.reset_index(inplace=True)
        annotations = annotations[annotations['score_and_performance_aligned'] == True]
        annotations.rename(columns={'midi_score_beats': 'score_beats'}, inplace=True)
        return annotations

def getPerformanceBeats(annotations, performance_filename):
    performance_beats=annotations[annotations['performance_filename']==performance_filename]['performance_beats']
    performance_beats=list(performance_beats.iloc[0])
    return performance_beats
def getScoreBeats(annotations, score_filename):
    score_beats=annotations[annotations['score_filename']==score_filename]['score_beats']
    score_beats=list(score_beats.iloc[0])
    return score_beats
 
def genData(pairs,score_dict,perf_dict,annotations):
    # iterate over train pairs and generate data
    data = []
    for i in tqdm(range(len(pairs))):
        # read filenames
        score_filename=pairs[i][0]
        performance_filename=pairs[i][1]

        # get tokens
        score_tokens = score_dict[score_filename]
        performance_tokens = perf_dict[performance_filename]

        # get beats
        score_beats = getScoreBeats(annotations, score_filename)
        performance_beats = getPerformanceBeats(annotations, performance_filename)

        # split tokens
        score_split = splitTokens(performance_tokens, performance_beats)
        performance_split=splitTokens(score_tokens, score_beats)

        # add to train data
        data.append((score_split, performance_split))
    return data


In [None]:
dataset_dir = "Datasets/asap-dataset"
train_split=0.8
store_dir="Store/asap-dataset"

# read annotations
annotations=read_annotations(f"{dataset_dir}/asap_annotations.json")

# find all pairs of score, performance
score_performance_pairs = []
for i in range(len(annotations)):
    score_performance_pairs.append((annotations.iloc[i]['score_filename'], annotations.iloc[i]['performance_filename']))

# split into train and test after shuffling
random.shuffle(score_performance_pairs)
train_pairs = score_performance_pairs[:int(len(score_performance_pairs)*train_split)]
test_pairs = score_performance_pairs[int(len(score_performance_pairs)*train_split):]

# read in tokens
with open(f'{store_dir}/perf_dict.pickle', 'rb') as handle:
    perf_dict = pickle.load(handle)
with open(f'{store_dir}/score_dict.pickle', 'rb') as handle:
    score_dict = pickle.load(handle)

# save train data
train_data = genData(train_pairs,score_dict,perf_dict,annotations)
with open(f'{store_dir}/train_data.pickle', 'wb') as handle:
    pickle.dump(train_data, handle, protocol=pickle.HIGHEST_PROTOCOL)

# save test data
test_data = genData(test_pairs,score_dict,perf_dict,annotations)
with open(f'{store_dir}/test_data.pickle', 'wb') as handle:
    pickle.dump(test_data, handle, protocol=pickle.HIGHEST_PROTOCOL)

print("Data generated")