In [None]:
"""
Code to take TON files that have been checked, and export pitch information (Hz, MIDI), note information, and a plot of note information

"""

In [48]:
# imports
import sys
import os
import numpy as np
import pandas as pd
import bz2
import re
import matplotlib.pyplot as plt
import pylab as pl
import matplotlib.patches as patches
from matplotlib import collections  as mc
is_non_decimal = re.compile(r"[^\d.]+")

dir_needs_check = '/Users/Lenovo/Desktop/estudo'

In [49]:
"""
renditionList

Function to return a list of files in dir 'dirLookup' that end with string
'endsWith'.

Checks to make sure that other necessary files exist.

For now, skips files that already have been processed, meaning there's an 
output file for that rendition that ends with 'skipIfYouSeeThisOutput'
"""
#Function to find the files. Change it to find different formats of files.
def renditionList(dirLookup,
                  endsWith='.ton',
                  skipIfYouSeeThisOutput=''):

    os.chdir(dirLookup)
    
    listdirectory = os.listdir("/")
    
    renditions = []
    for filename in listdirectory: 
        
        if filename.endswith(endsWith):
            
            # don't add to list if output file exists already
            skipIfSeen = dirLookup + filename[0:-len(endsWith)] + skipIfYouSeeThisOutput
            
            if os.path.isfile(skipIfSeen)==0:
                renditions.append(filename[0:-len(endsWith)])
                
    return renditions;

def ton_to_onsets(full_filepath):

    import bz2
    import numpy as np
    import pandas as pd
#Opens up a file, reads it, decodes it into utf strips off white space and splits
    content = bz2.BZ2File(full_filepath).read().decode("utf-8").rstrip().split("\n")

    onset_frame = []
    duration_frames = []
    central_tendency_freq = []
    for somestring in content:
        if (
            "point frame" in somestring
            and "value" in somestring
            and "label" in somestring
            and "duration" in somestring
        ):
            to_parse = somestring.split(" ")
            for x in to_parse:
                if "frame" in x:
                    val = int(is_non_decimal.sub("", x))
                    onset_frame.append(val)
                if "duration" in x:
                    val2 = int(is_non_decimal.sub("", x))
                    duration_frames.append(val2)
                if "value" in x:
                    val3 = is_non_decimal.sub("", x)
                    central_tendency_freq.append(val3)
#Sample rate. Reframes from different time samples.
    onset_s = np.array(onset_frame).astype(float) / 44100
    duration_s = np.array(duration_frames).astype(float) / 44100
    central_tendency_freq = np.array(central_tendency_freq).astype(float)
    midi = 12 * np.log2(central_tendency_freq / 440) + 69

    cur_df_notes = pd.DataFrame(
        {
            "Onset": onset_s,
            "Duration": duration_s,
        }
    )

    cur_df_notes = cur_df_notes.round(3)

    return cur_df_notes;

def ton_to_pitch(full_filepath):
    
    import bz2
    import numpy as np
    import pandas as pd

    content = bz2.BZ2File(f_fullpath).read().decode("utf-8").rstrip().split("\n")

    frame_subset = []
    pitch_subset = []
    for somestring in content:
        if (
            "point frame" in somestring
            and "value" in somestring
            and "label" in somestring
            and "duration" not in somestring
        ):
            to_parse = somestring.split(" ")
            for x in to_parse:
                if "frame" in x:
                    val = int(is_non_decimal.sub("", x))
                    frame_subset.append(val)
                if "value" in x:
                    val2 = float(is_non_decimal.sub("", x))
                    pitch_subset.append(val2)

    # calculate the final frame of the annotations 
    # (pitch track must extend that far)
    onset_frame = []
    duration_frames = []
    for somestring in content:
        if (
            "point frame" in somestring
            and "value" in somestring
            and "label" in somestring
            and "duration" in somestring
        ):
            to_parse = somestring.split(" ")
            for x in to_parse:
                if "frame" in x:
                    val = int(is_non_decimal.sub("", x))
                    onset_frame.append(val)
                if "duration" in x:
                    val2 = int(is_non_decimal.sub("", x))
                    duration_frames.append(val2)
    final_frame_anno = np.max(np.array(onset_frame) + np.array(duration_frames))
    if max(frame_subset) > final_frame_anno:
        max_frame = max(frame_subset)
    else:
        max_frame = final_frame_anno

    # turn sparse format pitch information to continuous pitch information
    # uses steps of 256 (chunks of samples, default in TONY)
    pitch_expand = []
    for i in range(0, max_frame, 256):
        if i in frame_subset:
            idx = frame_subset.index(i)
            if pitch_subset[idx]==0:
                pitch_expand.append(np.nan)
            else:
                pitch_expand.append(pitch_subset[idx])
        else:
            pitch_expand.append(np.nan)
    frame_expand = np.arange(0, max_frame, 256)
    pitch_expand = np.asarray(pitch_expand)

    # place continuous pitch info into dataframe
    cur_pitch_track = []
    cur_pitch_track = pd.DataFrame(
        {
            "Time": frame_expand / 44100,
            "Hz": pitch_expand,
            "MIDI": 12 * np.log2(pitch_expand / 440) + 69,
        }
    )
    cur_pitch_track = cur_pitch_track.round(3)

    return cur_pitch_track;

In [36]:
def calc_note_info(onsets_durs,Hz_track):
   
    # loop through each note and calculate measures of interest
    count=0
    note_num = []
    prop_note_ok = []
    cur_note_med_Hz = []
    cur_note_med_MIDI = []
    cur_note_iqtl_range_MIDI = []
    note_onset = []
    note_length = []
    note_middle_length = []
    
    for index, row in onsets_durs.iterrows():

        # the onset/offset in seconds
        onset_s = row["Onset"].astype(float)
        offset_s = row["Onset"].astype(float) + row["Duration"].astype(float)
     
        # find the indices of the note in the Hz track
        # roundabout but works if the time values are slightly different
        nearest_time_onset = np.abs(Hz_track["Time"] - onset_s)
        nearest_time_offset = np.abs(Hz_track["Time"] - offset_s)
        idx_on = np.where(Hz_track.Time == Hz_track["Time"][nearest_time_onset.idxmin])[0]
        idx_off = np.where(Hz_track.Time == Hz_track["Time"][nearest_time_offset.idxmin])[0]
        
        # consider middle section of note only (interquartile [iqtl] range 25-75)
        note_quartile_in_idxs = np.ceil((idx_off - idx_on)/4).astype(int)
        idx_on = idx_on + note_quartile_in_idxs
        idx_off = idx_off - note_quartile_in_idxs
        
        # pull out the freq values for stats
        cur_note_cont_Hz = Hz_track["Hz"][int(idx_on):int(idx_off)]

        # pull out the MIDI values for stats
        cur_note_cont_MIDI = 12 * np.log2(cur_note_cont_Hz / 440) + 69

        # what proportion of the note has valid values?
        prop_valid = 1 - sum(np.isnan(cur_note_cont_Hz)) / len(cur_note_cont_Hz)
        
        if prop_valid > 0:
            # calculate the measures of interest for this note
            count = count + 1
            note_num.append(count)
            note_onset.append(row["Onset"].astype(float))
            note_length.append(row["Duration"].astype(float))
            note_middle_length.append(Hz_track.Time[idx_off].values[0] - Hz_track.Time[idx_on].values[0])
            prop_note_ok.append(prop_valid)        
            cur_note_med_Hz.append(np.nanmedian(cur_note_cont_Hz))
            cur_note_med_MIDI.append(np.nanmedian(cur_note_cont_MIDI))
            
            q75, q25 = np.percentile(cur_note_cont_MIDI, [75 ,25])
            iqr = q75 - q25
            cur_note_iqtl_range_MIDI.append(iqr)   

    note_info = pd.DataFrame(
        dict(
            {
                "note_num": note_num,
                "prop_note_ok": prop_note_ok,
                "iqtl_median_Hz": cur_note_med_Hz,
                "iqtl_median_midi": cur_note_med_MIDI,
                "iqtl_range": cur_note_iqtl_range_MIDI,
                "note_onset": note_onset,
                "note_length": note_length,
                "iqtl_note_length": note_middle_length,
            }
        )
    )
    
    # get the intervals (since n-1, first entry is NA to align with notes in same table)
    note_info['interval_semitones'] = np.insert(np.array(note_info['iqtl_median_midi'][1:]) - 
                                      np.array(note_info['iqtl_median_midi'][0:-1]).astype(float), 
                                      0, 
                                      np.nan, 
                                      axis=0)
    
    note_info = note_info.round(2)
    
    return note_info;


In [50]:
def plot_annotation(onsets_durs,Hz_track,note_info,filename):

    fig = plt.figure(figsize=(15, 10))
    ax = plt.axes()

    plt.title(f)
    plt.xlabel('Recording time (s)')
    plt.ylabel('MIDI note number (Middle C = 60)')

    ax.plot(Hz_track['Time'], Hz_track['MIDI'],'r')

    for i in range(0,len(note_info)):
        x_0 = note_info['note_onset'][i]
        x_100 = (note_info['note_onset'][i]+note_info['note_length'][i])
        x_width = (x_100 - x_0)
        x_50 = x_0 + x_width/2
        y1 = note_info['iqtl_median_midi'][i] - 0.5
        y2 = note_info['iqtl_median_midi'][i] + 0.5

        ax.plot([x_0,x_0],[y1,y2], color = 'k', linewidth=2)
        ax.plot([x_100,x_100],[y1,y2], color = 'k', linewidth=2)
        ax.text(x_0,y2+0.2,str(i+1),ha='center')

        iqr = note_info['iqtl_range'][i]
        y1_iqr = note_info['iqtl_median_midi'][i] - iqr*0.5
        y2_iqr = note_info['iqtl_median_midi'][i] + iqr*0.5
        if iqr < 1:
            ax.add_patch(patches.Rectangle((x_0, y1_iqr),x_width,y2_iqr-y1_iqr,linewidth=1,color='#bddfeb',zorder=0.5))
        else:
            ax.add_patch(patches.Rectangle((x_0, y1_iqr),x_width,y2_iqr-y1_iqr,linewidth=1,color='y',zorder=0.5))
        ax.set_axisbelow(True)

        def applyPlotStyle():
            med_f0 = note_info['iqtl_median_midi']
            plt.yticks(np.arange(np.nanmin(np.floor(med_f0))-2, np.nanmax(np.ceil(med_f0))+2, step=1))
            plt.ylim(np.nanmin(np.floor(med_f0))-2, np.nanmax(np.ceil(med_f0))+2)
            plt.grid(b=1, which='major', axis='y')

        applyPlotStyle()

        ax.scatter(x_50, note_info['iqtl_median_midi'][i],color = 'k',s=100)

    plt.tight_layout()    
    plt.savefig(filename, dpi=300)
    plt.close()   

In [51]:
# get list of folders to check
os.chdir(dir_needs_check)
list_folders = [f for f in os.listdir(dir_needs_check) if not f.startswith('.')]

# loop through each folder (one per participant)
for folder in list_folders:
    
    # get list of original TON session files in folder
    os.chdir(dir_needs_check + '/' + folder)
    list_files = os.listdir(".")
    list_TON = []
    for filename in list_files: 
        if filename.endswith("_checked.ton"):
            list_TON.append(filename)

    # loop through files
    for f in list_TON:

        # open up the onset/duration information from TON file
        f_fullpath = dir_needs_check + '/' + folder + '/' + f
        onsets_durs = ton_to_onsets(f_fullpath)

        # only keep notes that exceed a minimum duration
        minimum_note_duration = 0.05
        onsets_durs = onsets_durs[onsets_durs.Duration > minimum_note_duration]

        # save onset/duration info to disk
        onsets_durs.to_csv(f[0:-12] + '_notebounds.csv', index=False)

        # get continuous pitch information from TON file
        f_fullpath = dir_needs_check + '/' + folder + '/' + f
        Hz_track = ton_to_pitch(f_fullpath)

        # save pitch track to disk
        Hz_track.to_csv(f[0:-12] + '_pitchtrack.csv', index=False)

        # calculate note info (median pitch / duration / etc)
        note_info = calc_note_info(onsets_durs,Hz_track)

        # add the participant's ID
        note_info['participant'] = np.tile(str.split(f,'_')[0],len(note_info))

        # add the trial ID
        note_info['trial'] = np.tile(str.split(f,'_')[1],len(note_info))

        # reorder the columns
        note_info = note_info[
            [
            'participant', 
            'trial',
            'note_num', 
            'prop_note_ok', 
            'iqtl_median_Hz', 
            'iqtl_median_midi', 
            'iqtl_range', 
            'note_onset',
            'note_length',
            'iqtl_note_length',
            'interval_semitones'
            ]
        ]

        # save note information as CSV
        note_info.to_csv(f[:-11]+'note_info.csv',index=False)
        
        # visualize the annotation
        plot_annotation(onsets_durs,Hz_track,note_info,f[:-11]+'visualized.png')
        