In [9]:
import time

def stopwatch(func, *args, **kwargs):
    print("Begin executing function...")
    start = time.time()
    funcReturn = func(*args, **kwargs)
    stop = time.time()
    print('Finished executing function...')
    duration = (stop-start)*1000
    print(f'Time to execute function (milliseconds): {duration}')
    return funcReturn

def test(a):
    print(a)


stopwatch(test, 'a')

Begin executing function...
a
Finished executing function...
Time to execute function (milliseconds): 0.0


In [52]:
data_dir = "data/raw_training/training_data/"
df = loadTrainingData(data_dir)

df.head(3)


loading raw data from  data/raw_training/training_data/


Reading from .txt files in data/raw_training/training_data/: 100%|██████████| 942/942 [00:00<00:00, 4864.56it/s]
Getting audio file segmentation information: 100%|██████████| 3163/3163 [00:00<00:00, 5234.55it/s]


patient_id,murmur_in_patient,audio_file,annotation_file,segments,murmur_in_recording,recording_location,sampling_frequency,total_locations,murmur_locations,most_audible_location,outcome,age,sex,height,weight,pregnancy_status,sys_mur_timing,sys_mur_shape,sys_mur_pitch,sys_mur_grading,sys_mur_quality,dia_mur_timing,dia_mur_shape,dia_mur_pitch,dia_mur_grading,dia_mur_quality,campaign,additional_id
i64,str,str,str,list [str],str,str,i64,i64,list [str],str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str
13918,"""Present""","""13918_AV.wav""","""13918_AV.tsv""","[""1.14675+1.779916"", ""1.779916+2.320036"", ... ""8.610845+9.198623""]","""Absent""","""AV""",4000,4,"[""TV""]","""TV""","""Abnormal""","""Child""","""Male""","""98.0""","""15.9""","""False""","""Holosystolic""","""Plateau""","""Low""","""I/VI""","""Blowing""","""nan""","""nan""","""nan""","""nan""","""nan""","""CC2015""","""nan"""
13918,"""Present""","""13918_PV.wav""","""13918_PV.tsv""","[""1.214+1.794094"", ""1.794094+2.365901"", ... ""5.191788+5.744259""]","""Absent""","""PV""",4000,4,"[""TV""]","""TV""","""Abnormal""","""Child""","""Male""","""98.0""","""15.9""","""False""","""Holosystolic""","""Plateau""","""Low""","""I/VI""","""Blowing""","""nan""","""nan""","""nan""","""nan""","""nan""","""CC2015""","""nan"""
13918,"""Present""","""13918_TV.wav""","""13918_TV.tsv""","[""0.1085+0.700176"", ""0.700176+1.240176"", ... ""4.640176+5.200176""]","""Present""","""TV""",4000,4,"[""TV""]","""TV""","""Abnormal""","""Child""","""Male""","""98.0""","""15.9""","""False""","""Holosystolic""","""Plateau""","""Low""","""I/VI""","""Blowing""","""nan""","""nan""","""nan""","""nan""","""nan""","""CC2015""","""nan"""


In [49]:
df.filter(pl.col('annotation_file')=='50782_MV_1.tsv')

patient_id,murmur_in_patient,audio_file,annotation_file,segments,murmur_in_recording,recording_location,sampling_frequency,total_locations,murmur_locations,most_audible_location,outcome,age,sex,height,weight,pregnancy_status,sys_mur_timing,sys_mur_shape,sys_mur_pitch,sys_mur_grading,sys_mur_quality,dia_mur_timing,dia_mur_shape,dia_mur_pitch,dia_mur_grading,dia_mur_quality,campaign,additional_id
i64,str,str,str,list [str],str,str,i64,i64,list [str],str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str


In [50]:
import os
from pydoc import cli
import polars as pl
import numpy as np
import helpers.audio_tools as adt
import helpers.lut as lut
import tqdm

def __getCacheDir():
    return 'cache'

def loadTrainingData(data_dir, encode_data=False):
    #file to store ingested data, inside cache_dir directory
    cache_dir = __getCacheDir()
    cache_file = 'ingested_data.json'
    
    #check if data has already been ingested and stored in cache_file
    os.makedirs(cache_dir, exist_ok=True)
    desired_audio_files = [x for x in os.listdir(data_dir) if x.endswith('.wav')]
    data_is_saved = __checkCachedDataframe(desired_audio_files, cache_dir, cache_file)
    
    #load df from save file if it exists, otherwise generate df from raw data
    if data_is_saved:
        print('loading data from save file: ', cache_dir + '/' + cache_file)
        df = pl.read_json(cache_dir + '/' + cache_file)
    else:
        print("loading raw data from ", data_dir)
        nested_data = ['audio_file', 'annotation_file', 'recording_location']
        df = (
            __ingest_data(data_dir)
            .explode(nested_data)           #explode df so that each audio file and its corresponding recording location is on its own line
            .pipe(__getMurmurInRecording)   #get murmur_in_recording (whether a murmur is present in the corresponding recording)
            .pipe(__getSegments, data_dir)  #get start, end points of segments in audio files
            .pipe(reorderCols)
        )
        # #save df to file. Future calls to loadData will load the df from this file
        # df.write_json(cache_dir + '/' + cache_file)

    if encode_data:
        df = encodeData(df)

    return df


def getSpectrogram(df, data_dir):

    #check if df already has a spectrogram column
    if 'spectrogram' in df.columns:
        return df

    spectro_dir = __getCacheDir() + '/spectrograms'
    os.makedirs(spectro_dir, exist_ok=True)

    #check if spectrograms have already been created and stored in cache
    desired_spectros = df.get_column('audio_file').apply(lambda x: x.replace('.wav', '.npy')).to_list()
    spectros_are_saved = __checkCachedSpectros(desired_spectros, spectro_dir)

    if spectros_are_saved:
        length = df.height
        progress = tqdm.tqdm(total=length, desc="Loading spectrograms from cache")
        out = df.with_column(
            pl.col('audio_file').str.replace('.wav', '.npy')
            .apply(lambda x: __function_with_logUpdater(progress, np.load(spectro_dir + '/' + x)))
            .alias('spectrogram')
        )
        progress.close()
    else:
        length = df.height * 2
        progress = tqdm.tqdm(total=length, desc="Generating spectrograms")
        out = df.with_column(
            pl.col('audio_file')
            .apply(lambda x: __function_with_logUpdater(progress, __file_to_spectro(x, path=data_dir, output_folder=spectro_dir)))
            .apply(lambda x: __function_with_logUpdater(progress, np.load(spectro_dir + '/' + x)))
            .alias('spectrogram')
        )
        progress.close()
    
    out = reorderCols(out)
    return out


def encodeData(data):
    #assume no duplicate column names

    #check if data is a polars dataframe
    if not isinstance(data, pl.internals.frame.DataFrame):
        raise Exception('data is of unsupported type "{}". Supported types include polars.internals.frame.DataFrame'.format(data.type()))

    cipher = lut.getCipher()
    data_columns = data.columns
    numeric_data = [x for x in ['patient_id', 'total_locations', 'sampling_frequency', 'height', 'weight', 'additional_id'] if x in data_columns]
    cipher_friendly_data = [x for x in cipher.keys() if x in data_columns and x!='murmur_locations']
    unencoded_data = [x for x in data_columns if x not in (*numeric_data, *cipher_friendly_data, 'murmur_locations')]

    out = data.select([
        #cast numeric data to float type
        pl.col(numeric_data)
        .cast(pl.datatypes.Float64),

        #in murmur_locations: encode each element of each list in column using cipher
        pl.col('murmur_locations').arr.eval(pl.element().apply(lambda x: cipher['murmur_locations'][x])),

        #encode cipher friendly data
        pl.col(cipher_friendly_data)
        .map(lambda x: __applyCipher(x, cipher)),

        pl.col(unencoded_data)
    ])

    out = reorderCols(out)

    return out


def reorderCols(data):
    #assume no duplicate column names in data
    all_cols = data.columns

    #desired order of columns
    ordered_cols = [
        'patient_id',           
        'murmur_in_patient',               
        'audio_file', 
        'annotation_file',
        'segments',
        'spectrogram',
        'mfcc',
        'murmur_in_recording',         
        'recording_location',  
        'sampling_frequency',   
        'total_locations',        
        'murmur_locations',     
        'most_audible_location',
        'outcome',              
        'age',                  
        'sex',                  
        'height',               
        'weight',               
        'pregnancy_status',     
        'sys_mur_timing',       
        'sys_mur_shape',        
        'sys_mur_pitch',        
        'sys_mur_grading',      
        'sys_mur_quality',      
        'dia_mur_timing',       
        'dia_mur_shape',        
        'dia_mur_pitch',        
        'dia_mur_grading',      
        'dia_mur_quality',      
        'campaign',             
        'additional_id',        
    ]
    ordered_cols = [x for x in ordered_cols if x in all_cols]

    #remaining columns with no specified order
    unordered_cols = sorted(set(all_cols).difference(set(ordered_cols)))

    #columns are ordered as specified in order_cols.
    #remaining columns are included afterwards.
    out = data.select([*ordered_cols, *unordered_cols])
    
    return out
    

def splitDataframe(df, split_ratio=0.8):
    total_size = df.height
    head_size = round(split_ratio * total_size)
    tail_size = total_size - head_size

    df = df.sample(frac=1.0, shuffle=True)
    head_df = df.head(head_size)
    tail_df = df.tail(tail_size)
    
    return head_df, tail_df

########################################################################
#   PRIVATE FUNCTIONS
########################################################################

def __ingest_data(data_dir):

    data = lut.getClinicalData()
    clinical_iterables = lut.getClinicalIterables()

    #loop through txt files in directory
    total_txt_files = len([x for x in os.listdir(data_dir) if x.endswith('.txt')])
    progress = tqdm.tqdm(total=total_txt_files, desc="Reading from .txt files in "+data_dir)
    for file in os.listdir(data_dir):
        if file.endswith(".txt"):
            # open text file
            with open(data_dir + "/" + file, "r") as f:

                # read first line
                line = f.readline()
                # split line into list
                line = line.strip().split(" ")
                data['patient_id'].append(int(line[0]))
                data['total_locations'].append(int(line[1]))
                data['sampling_frequency'].append(int(line[2]))
                
                # loop through each line to check if it matches with an iterables or if it contains a wav file
                audio_files = []
                annotation_files = []
                recording_locations = []
                for line in f:
                    # check if line contains .wav
                    if ".wav" in line:
                        # split the line 
                        line_split = line.strip().split(" ")
                        audio_files.append(line_split[2]) 
                        annotation_files.append(line_split[3])
                        recording_locations.append(line_split[0]) 
                    #loop through iterables to check if line matches with any of them
                    else:
                        for iterable in clinical_iterables:
                            if line.startswith(clinical_iterables[iterable] + ":"):
                                # get the value of the iterable
                                value = line.split(': ', 1)[1].strip()
                                # add the value to the data
                                data[iterable].append(value)
                                break

                data['audio_file'].append(audio_files)
                data['annotation_file'].append(annotation_files)
                data['recording_location'].append(recording_locations)
                progress.update(1)
    progress.close()

    #save data in polars Dataframe
    df = pl.DataFrame(data)

    #split each element in murmur_locations (type=str) into list
    df = df.with_column(pl.col('murmur_locations').str.split(by='+'))

    return df


def __getMurmurInRecording(data):
    out = data.with_column(
        pl.when(pl.col('murmur_in_patient').is_in(['Absent', 'Unknown']))
        .then(pl.col('murmur_in_patient'))
        .when(pl.all([
            pl.col('murmur_in_patient')=='Present',
            pl.col('recording_location').is_in('murmur_locations')
        ]))
        .then('Present')
        .otherwise('Absent')
        .alias('murmur_in_recording')
    )

    return out


def __readAnnotationFile(file_path):
    # Store file contents with 3 columns and n rows as a nested list
    with open(file_path, 'r') as f:
        file = [line.strip().split() for line in f]

    # Iterate through each line and identify every occurence of the sequence '1,2,3,4' in column 3
    # Each of these sequences corresponds to one time segment
    # Store the start time (column 1) and the end time (column 2) of every segment in a list as a string seperated by '+'
    segments = []
    line = 0
    while line<len(file)-3:
        for i in range(4):
            if int(file[line+i][2]) != 1+i:
                line += 1
                break
        else:
            start, end = file[line][0], file[line+3][1]
            segments.append(f'{start}+{end}')
            line += 4

    if len(segments)<1:
        segments = [""]

    return segments

#filters out unsegmented data
def __getSegments(data, data_dir):
    progress = tqdm.tqdm(total=data.height, desc='Getting audio file segmentation information')
    out = data.with_column(
        pl.col('annotation_file').apply(lambda x: os.path.join(data_dir, x))
        .apply(lambda x: __function_with_logUpdater(progress, __readAnnotationFile(x)))
        .alias('segments')
    )
    out = out.filter(pl.col('segments').arr.first() != "")
    progress.close()
    return out


def __file_to_spectro(file, path="", output_folder='', sr =4000):
    os.makedirs(output_folder, exist_ok=True)
    spectro = adt.wav_to_spectro(path + "/" + file, sr=sr)
    spectro_file = file.replace('.wav', '.npy')
    np.save(output_folder + '/' + spectro_file, spectro)

    return spectro_file


def __function_with_logUpdater(progress, func):
    out = func
    progress.update(1)
    return out

#assumes data_dir and cache_dir both exist
def __checkCachedDataframe(desired_audio_files, cache_dir, save_file):            
    #check if save file exists in cache_dir
    if save_file not in os.listdir(cache_dir):
        data_is_saved = False
    else:
        try:
            #check if saved data matches the desired data
            saved_data = pl.read_json(cache_dir + '/' + save_file)
            saved_audio_files = saved_data.get_column('audio_file').to_list()
            if set(saved_audio_files) == set(desired_audio_files):
                data_is_saved = True
            else:
                data_is_saved = False
        except Exception as e:
            print(
                'An unexpected error occured while trying to check the cache file \n' +
                '\'%s\' in the cache directory \'%s\'\n' % (save_file, cache_dir) +
                ' (see below for error message). \n\n' +
                'Troubleshooting:\n' + 
                ' - make sure that the cache directory exists\n' + 
                ' - try deleting cache file and running program again\n'
            )
            raise
    return data_is_saved

#assumes cache_dir exists
def __checkCachedSpectros(desired_spectros, spectro_dir):
    saved_spectros = os.listdir(spectro_dir)
    if set(desired_spectros).issubset(set(saved_spectros)):
        spectros_are_saved = True
    else:
        spectros_are_saved = False
    return spectros_are_saved

    

def __applyCipher(col, cipher):
    col_name = col.name
    out = col.apply(lambda x: cipher[col_name][x])
    return out