In [246]:
import numpy as np
from os.path import join as joinpath
import pandas as pd
import os, os.path, copy
from math import pow, log 
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import re

In [247]:
path = "/Users/polinap/Yandex.Disk.localized/RESEARCH_RU/VOCAL_NOTES_PROJECT/RUSSIAN/ANALYSIS/" # path to ANALYSIS folder
song = "" # leave "" to run through all songs
divider = "__"
tonal_centre_frequency = -1 # you can set the tonal centre manually 

# notes file expected at: <path>/<song>/<transcriber>/<song><divider><transcriber><divider>'notes.csv'
# segments file optional at: <path>/<song>/<transcriber>/<song><divider><transcriber><divider>'segments.csv'

In [248]:
# automatically detect transcribers and collect note files
def detect_transcribers(song):
    songpath = joinpath(path,song)
    transcriber_list = next(os.walk(songpath))[1]

    notespath_list = []
    segmentspath_list = []
    tlist = copy.deepcopy(transcriber_list)

    for transcriber in tlist:
        notesfile = song + divider + transcriber + divider + 'notes.csv'
        segmentsfile = song + divider + transcriber + divider + 'segments.csv'
        notespath = joinpath(path,song,transcriber,notesfile)
        segmentspath = joinpath(path,song,transcriber,segmentsfile)
        if os.path.isfile(notespath):
            notespath_list.append(notespath)
            if os.path.isfile(segmentspath):
                segmentspath_list.append(segmentspath)
        else:
            transcriber_list.remove(transcriber)
    print("transcribers: ", transcriber_list)
    print("note files: ", notespath_list)
    print("segment files: ", segmentspath_list)
    
    return transcriber_list, notespath_list, segmentspath_list

# custom csv read
def read_from_csv(notespath):
    notes = pd.read_csv(notespath, header=None) # doesn't read headers

    # remove columns beyond the first three
    if len(notes.columns) > 3:
        notes = notes.iloc[:,:3]

    # remove headers
    if notes.iloc[0,0]=="TIME":
        notes = notes.iloc[1:,:]
        notes.reset_index(drop=True, inplace=True)

    notes.columns = ['TIME', 'VALUE', 'DURATION']
    
    notes['VALUE'] = notes['VALUE'].astype(float)
    if all(notes['VALUE']%1==0):        # all integers
        notes['VALUE'] = notes['VALUE'].astype(int)

    return notes

In [249]:
##% tonal centre and hertz to cents (frequency to interval) calculation functions

# get the last occurence of the longest note - the first approximation for the tonal centre
def get_longest_note(note_pitch_array, note_dur_array):
    durations_reversed = note_dur_array[::-1]
    longest_note_idx = len(durations_reversed) - np.argmax(durations_reversed) - 1
    longest_note_pitch = note_pitch_array[longest_note_idx]
    #print("longest note frequency: ", longest_note_pitch)
    return longest_note_pitch

# convert note pitches into cents relative to the longest note pitch
def pitch2cent(hertz_from, hertz_to):
    # hertz_from > hertz_to will return a negative value
    return round(1200*log(hertz_to/hertz_from,2))

def pitch2interval_func(tonal_centre):
    # construct a function which takes a pitch and returns the interval to tonal centre
    return lambda pitch: pitch2cent(tonal_centre, pitch)

# calculate the tonal centre for a join of all transcriptions
def get_tonal_centre(pitches_list, durations_list):
    note_pitch_array = np.concatenate(pitches_list)
    note_dur_array = np.concatenate(durations_list)

    longest_note_pitch = get_longest_note(note_pitch_array, note_dur_array) # the first approximation for the tonal centre

    pitch2interval = pitch2interval_func(longest_note_pitch) # this function only has one argument
    pitch2interval_v = np.vectorize(pitch2interval) # vectorise to be able to apply to each element of an array
    note_cents_array = pitch2interval_v(note_pitch_array)
    
    # find histogram peak of notes in cents - a better approximation of the tonal centre
    mybins = 60
    h, hedges = np.histogram(note_cents_array, bins=mybins)
    
    # plot the histogram for the join of both transcriptions
    #plt.bar(hedges[:-1],h, width=10, align='edge')
    #plt.show()
    
    peaks, _ = find_peaks(h, distance=5, prominence=2)
    estimated_tonal_centre = hedges[peaks[h[peaks].argmax()]]
    estimated_tonal_centre_pitch = pow(2, estimated_tonal_centre/1200)*longest_note_pitch
    #print("estimated tonal centre: ", estimated_tonal_centre, " cents from the longest note")
    #print("estimated tonal centre frequency: ", estimated_tonal_centre_pitch)
    
    return estimated_tonal_centre_pitch, longest_note_pitch, note_cents_array

# recalculate notes relative to the new tonal centre
def recalculate_intervals_4new_tc(tonal_centre_frequency, longest_note_pitch, note_cents_array):
    tonal_centre_cents = pitch2cent(longest_note_pitch, tonal_centre_frequency)
    note_cents_array = note_cents_array - tonal_centre_cents
    return note_cents_array

def write_out_cents_files(notespath_list, notes_list, pitches_list, note_cents_array, tc_string):
    # substitute note frequencies by notes in cents in the dataframes
    for notes, pitches in zip(notes_list, pitches_list):
        cents = note_cents_array[0:len(pitches)]
        notes['VALUE'] = pd.Series(cents)
        note_cents_array = note_cents_array[len(pitches):]    
    # write to file
    for notes, notespath in zip(notes_list, notespath_list):
        fileout = notespath.replace("notes", "notes_cents_tc"+tc_string)
        notes.to_csv(fileout, index = False)
        print("saved: ", fileout)

In [250]:
##% construct comparison original vs corrected note pitches
# TODO this should create a modified svl file to align with both the pitch trace and original notes
# segments.svl can be used for that. What seems sensible is to construct cents diff

def compare_original_corrected_notes(notespath_list, segmentspath_list, tonal_centre_frequency):
    
    notes_list = []
    for notespath in notespath_list:
        notes = read_from_csv(notespath)
        notes_list.append(notes)

    segments_list = []
    for segmentspath in segmentspath_list:
        segments = read_from_csv(segmentspath)
        segments_list.append(segments)

    for notes, segments, notespath in zip(notes_list, segments_list, notespath_list):
        note_pitches = notes['VALUE'].astype(float).to_numpy()
        segment_pitches = segments['VALUE'].astype(float).to_numpy()
        
        if len(note_pitches) != len(segment_pitches):
            print('WARNING: no. of segments != no. of notes for path ', notespath_list)
            
        else:     
            # transform to cents
            pitch2interval = pitch2interval_func(tonal_centre_frequency)
            pitch2interval_v = np.vectorize(pitch2interval) 
            note_cents = pitch2interval_v(note_pitches)
            segment_cents = pitch2interval_v(segment_pitches)

            # calculate diff and put into the dataframe instead of note pitch values
            cents_diff = note_cents - segment_cents
            pitch_correction_diff_df = notes
            pitch_correction_diff_df['VALUE'] = pd.Series(cents_diff)

            # add min and max for colour calibration in SV - the same for all tracks and transcriptions
            pitch_correction_diff_df.loc[-2] = [0.001, -500, 0.001]
            pitch_correction_diff_df.loc[-1] = [0.002, 500, 0.001]
            pitch_correction_diff_df.index = pitch_correction_diff_df.index + 2
            pitch_correction_diff_df.sort_index(inplace=True)

            # only retain non-zero entries
            pitch_correction_diff_df = pitch_correction_diff_df[pitch_correction_diff_df['VALUE']!=0]

            # save
            fileout_pitches = notespath.replace("notes", "pitch_correction_diff")
            pitch_correction_diff_df.to_csv(fileout_pitches, index = False)
            print("saved: ", fileout_pitches)
    

In [251]:
##% colour coding equalisation functions

# change vertical resolution in an .svl note file
def change_vertical_resolution_svl(filepath, min, max):
    with open(filepath, 'r') as file:
        data = file.read()
    data = re.sub("minimum=\"[0-9.]*\"", "minimum=\""+str(min)+"\"", data)
    data = re.sub("maximum=\"[0-9.]*\"", "maximum=\""+str(max)+"\"", data)
    outfilepath = filepath.replace(".svl", "_min"+str(min)+"_max"+str(max)+".svl")
    with open(outfilepath, 'w') as outfile:
        outfile.write(data)
    print('saved: ', outfilepath)

def add_maxmin_to_files(notespath_list):
    notes_list = []
    concat_list = []
    for notespath in notespath_list:
        notes = read_from_csv(notespath)
        notes_list.append(notes)
        concat_list.append(notes['VALUE'].astype(float))

    # combine the pitches of all transcriptions, find max and min values
    df = pd.concat(concat_list, axis=1)
    array = df.to_numpy()

    mymax = np.nanmax(array)
    mymin = np.nanmin(array)

    # add max and min values at the start of each transcription
    for notes in notes_list:
        notes.loc[-2] = [0.001, mymax, 0.001]
        notes.loc[-1] = [0.002, mymin, 0.001]
        notes.index = notes.index + 2 
        notes.sort_index(inplace=True)
        if all(notes['VALUE']%1==0): 
            notes['VALUE'] = notes['VALUE'].astype(int)
        else: 
            notes['VALUE'] = notes['VALUE'].astype(float)
        
    # save
    for notes, notespath in zip(notes_list, notespath_list):
        fileout_eqrange = notespath.replace('.csv', '_eqrange.csv')
        notes.to_csv(fileout_eqrange, index = False)
        print("saved: ", fileout_eqrange)
    
    return mymin, mymax
    
# set minimum and maximum flags in .svl files to equalise vertical resolution
def equalise_pitch_ranges_svl(notespath_list, mymin, mymax):
    for notespath in notespath_list:
        svl_file = notespath.replace('.csv','.svl')
        if os.path.isfile(svl_file):      
            change_vertical_resolution_svl(svl_file, mymin, mymax)

# adjust pitches files ranges to equalise colour coding in SV
def equalise_pitch_ranges(transcriber_list, fileext_list):
    notespath_list = []
    for transcriber in transcriber_list:
        for fileext in fileext_list:
            filename = song + divider + transcriber + divider + fileext + '.csv'
            notespath = joinpath(path,song,transcriber,filename)
            if os.path.isfile(notespath):
                notespath_list.append(notespath)
                
    min, max = add_maxmin_to_files(notespath_list)
    return min, max
    

In [252]:
##% howmanynotes, alignment functions

# alignment cost function
def getOverlap(segment1, segment2):
    return max(0, min(segment1[1], segment2[1]) - max(segment1[0], segment2[0]))
def getOverlapCost(segment1, segment2):
    overlap = getOverlap(segment1, segment2)
    if overlap == 0:
        return 1
    len1 = segment1[1] - segment1[0]
    len2 = segment2[1] - segment2[0]
    return 1 - max(overlap/len1, overlap/len2)

# match segment sequences with dtw
def mydtw(series_1, series_2):
    # copied from simpledtw, changed cost function
	matrix = np.zeros((len(series_1) + 1, len(series_2) + 1))
	matrix[0,:] = np.inf
	matrix[:,0] = np.inf
	matrix[0,0] = 0
	for i, vec1 in enumerate(series_1):
		for j, vec2 in enumerate(series_2):
			cost = getOverlapCost(vec1,vec2)
			matrix[i + 1, j + 1] = cost + min(matrix[i, j + 1], matrix[i + 1, j], matrix[i, j])
	matrix = matrix[1:,1:]
	i = matrix.shape[0] - 1
	j = matrix.shape[1] - 1
	matches = []
	mappings_series_1 = [list() for v in range(matrix.shape[0])]
	mappings_series_2 = [list() for v in range(matrix.shape[1])]
	while i > 0 or j > 0:
		matches.append((i, j))
		mappings_series_1[i].append(j)
		mappings_series_2[j].append(i)
		option_diag = matrix[i - 1, j - 1] if i > 0 and j > 0 else np.inf
		option_up = matrix[i - 1, j] if i > 0 else np.inf
		option_left = matrix[i, j - 1] if j > 0 else np.inf
		move = np.argmin([option_diag, option_up, option_left])
		if move == 0:
			i -= 1
			j -= 1
		elif move == 1:
			i -= 1
		else:
			j -= 1
	matches.append((0, 0))
	mappings_series_1[0].append(0)
	mappings_series_2[0].append(0)
	matches.reverse()
	for mp in mappings_series_1:
		mp.reverse()
	for mp in mappings_series_2:
		mp.reverse()
	
	return matches, mappings_series_1, mappings_series_2

# for a notes file, collect segments
def collect_segments(notespath):
    notes = read_from_csv(notespath)
    onsets = notes['TIME'].astype(float).to_numpy()

    durations = notes['DURATION'].astype(float).to_numpy()
    if min(durations) == 0:
        for idx in range(len(duration)):
            if durations[idx] == 0:
                durations[idx] = 0.00001

    offsets = np.sum([onsets, durations],axis=0)
    if onsets[0] < 0:
        onsets[0] = 0
    if offsets[0] <= 0:
        offsets[0] = 0.000001
    segments = np.array(list(zip(onsets, offsets)))
        
    return segments

# collect note clusters (where number of segments/notes differs between transcribers) from an alignment
def get_note_clusters(segments1, segments2, mapping_1, mapping_2):
    cluster_segments = []
    for idx in range(len(mapping_1)):
        if len(mapping_1[idx]) > 1:
            start1, stop1 = segments1[idx]
            start2 = segments2[mapping_1[idx][0]][0]
            stop2 = segments2[mapping_1[idx][-1]][1]
            start = min(start1, start2)
            stop = max(stop1, stop2)
            cluster_segments.append([start, stop])
    for idx in range(len(mapping_2)):
        if len(mapping_2[idx]) > 1:
            start1, stop1 = segments2[idx]
            start2 = segments1[mapping_2[idx][0]][0]
            stop2 = segments1[mapping_2[idx][-1]][1]
            start = min(start1, start2)
            stop = max(stop1, stop2)
            cluster_segments.append([start, stop])
            
    return cluster_segments

def write_clusters_2csv(cluster_segments, transcriber1, transcriber2):
    df = pd.DataFrame(cluster_segments)
    df.columns = ["ONSET", "OFFSET"]
    df["DURATION"] = df['OFFSET'] - df['ONSET']
    df.columns = ["ONSET", "OFFSET", "DURATION"]
    df = df[["ONSET", "DURATION"]]

    outfile = "howmanynotes" + divider + transcriber1 + divider + transcriber2 + '.csv'
    outpath = joinpath(path,song,outfile)
    df.to_csv(outpath, index = False)
    print("saved: ", outfile)
    
    
# main    
def calculate_howmanynotes_groups(transcriber_list):
    for count, transcriber1 in enumerate(transcriber_list[:-1]):
        for transcriber2 in transcriber_list[count+1:]:

            notespath_list = []
            filename1 = song + divider + transcriber1 + divider + 'notes.csv'
            filename2 = song + divider + transcriber2 + divider + 'notes.csv'
            notespath1 = joinpath(path,song,transcriber1,filename1)
            notespath2 = joinpath(path,song,transcriber2,filename2)

            segments1 = collect_segments(notespath1)
            segments2 = collect_segments(notespath2)

            matches, mapping_1, mapping_2 = mydtw(segments1, segments2)
            cluster_segments = get_note_clusters(segments1, segments2, mapping_1, mapping_2)
            write_clusters_2csv(cluster_segments, transcriber1, transcriber2)

In [253]:
##% run all SV scripts for one song

def run_all_SV_scripts(song):
    print("---------------\nRunning SV scripts for song: ", song)
    
    transcriber_list, notespath_list, segmentspath_list = detect_transcribers(song)
    if len(transcriber_list) < 2:
        print("WARNING: not enough transcribers")
    
    else: 
        # calculate the tonal centre for a joint of all transcriptions
        notes_list = []
        pitches_list = []
        durations_list = []
        for notespath in notespath_list:
            notes = read_from_csv(notespath)
            notes_list.append(notes)
            pitches_list.append(notes['VALUE'].astype(float).to_numpy())
            durations_list.append(notes['DURATION'].astype(float).to_numpy())

        estimated_tonal_centre_pitch, longest_note_pitch, note_cents_array = get_tonal_centre(pitches_list, durations_list)

        # change the tonal centre manually if needed
        tonal_centre_frequency = estimated_tonal_centre_pitch
        tc_string = str('%.2f' %tonal_centre_frequency)
        print("tonal centre frequency: ", tc_string)
        # calculate notes as intervals in cents
        note_cents_array = recalculate_intervals_4new_tc(tonal_centre_frequency, longest_note_pitch, note_cents_array)
        write_out_cents_files(notespath_list, notes_list, pitches_list, note_cents_array, tc_string)

        # construct comparison original vs corrected note pitches
        compare_original_corrected_notes(notespath_list, segmentspath_list, tonal_centre_frequency)
        # adjust pitches ranges across files to equalise colour coding in SV
        min, max = equalise_pitch_ranges(transcriber_list, ["notes", "segments"])
        equalise_pitch_ranges_svl(notespath_list, min, max)
        equalise_pitch_ranges(transcriber_list, ["notes_cents_tc"+tc_string])
        # calculate groups of howmanynotes
        calculate_howmanynotes_groups(transcriber_list)

In [254]:
if len(song) > 0:
    run_all_SV_scripts(song)
else:
    song_list = next(os.walk(path))[1]
    for song in song_list:
        run_all_SV_scripts(song)

---------------
Running SV scripts for song:  Selo_veselo_2
transcribers:  []
note files:  []
segment files:  []
---------------
Running SV scripts for song:  Selo_veselo_5
transcribers:  []
note files:  []
segment files:  []
---------------
Running SV scripts for song:  Selo_veselo_4
transcribers:  []
note files:  []
segment files:  []
---------------
Running SV scripts for song:  Selo_veselo_3
transcribers:  []
note files:  []
segment files:  []
---------------
Running SV scripts for song:  Uzh_ja_dumala_1 
transcribers:  []
note files:  []
segment files:  []
---------------
Running SV scripts for song:  Da_po_zoriushke_1
transcribers:  ['PP', 'OV', 'YN']
note files:  ['/Users/polinap/Yandex.Disk.localized/RESEARCH_RU/VOCAL_NOTES_PROJECT/RUSSIAN/ANALYSIS/Da_po_zoriushke_1/PP/Da_po_zoriushke_1__PP__notes.csv', '/Users/polinap/Yandex.Disk.localized/RESEARCH_RU/VOCAL_NOTES_PROJECT/RUSSIAN/ANALYSIS/Da_po_zoriushke_1/OV/Da_po_zoriushke_1__OV__notes.csv', '/Users/polinap/Yandex.Disk.locali

saved:  howmanynotes__OV__YN.csv
---------------
Running SV scripts for song:  Selo_veselo_1
transcribers:  []
note files:  []
segment files:  []
---------------
Running SV scripts for song:  Temno_li_na_nebe
transcribers:  ['OV']
note files:  ['/Users/polinap/Yandex.Disk.localized/RESEARCH_RU/VOCAL_NOTES_PROJECT/RUSSIAN/ANALYSIS/Temno_li_na_nebe/OV/Temno_li_na_nebe__OV__notes.csv']
segment files:  []
---------------
Running SV scripts for song:  Kak_letala_jara
transcribers:  ['PP', 'OV']
note files:  ['/Users/polinap/Yandex.Disk.localized/RESEARCH_RU/VOCAL_NOTES_PROJECT/RUSSIAN/ANALYSIS/Kak_letala_jara/PP/Kak_letala_jara__PP__notes.csv', '/Users/polinap/Yandex.Disk.localized/RESEARCH_RU/VOCAL_NOTES_PROJECT/RUSSIAN/ANALYSIS/Kak_letala_jara/OV/Kak_letala_jara__OV__notes.csv']
segment files:  ['/Users/polinap/Yandex.Disk.localized/RESEARCH_RU/VOCAL_NOTES_PROJECT/RUSSIAN/ANALYSIS/Kak_letala_jara/PP/Kak_letala_jara__PP__segments.csv', '/Users/polinap/Yandex.Disk.localized/RESEARCH_RU/VOCA