In [1]:
import os
import re

In [2]:
def setup_data_directories(aRawDirectory, aStageDirectory, aDataStoreName):
    '''

    Args:
        aRawDirectory : where to store the downloaded data
        aStageDirectory : where to convert data to CSV & TXT
        aDataStoreName : where to establish the HDF5 DataStore

    Returns:
        none
    '''
    # shutil.rmtree(aRawDirectory,True)
    # shutil.rmtree(aStageDirectory,True)
    # shutil.rmtree(aDataStoreName,True)
    import errno
    def mkdir_p(path):
        try:
            os.makedirs(path)
        except OSError as exc:  # Python >2.5
            if exc.errno == errno.EEXIST and os.path.isdir(path):
                pass
            else:
                raise
    mkdir_p(aRawDirectory)
    mkdir_p(aStageDirectory)
    mkdir_p(aDataStoreName)


Note 
apt install python3-lxml


In [3]:
!pip3 install lxml

[33mYou are using pip version 8.1.2, however version 9.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [4]:
def download_physionet_files(aDatabase='mghdb', aTargetDataDirectory='./data', shouldClean=False,
                             useExtensions=['atr', 'dat', 'hea']):
    
    '''
    Download MGH/MF data files from Physionet

    Args:
        aTargetDataDirectory : directory in which to store raw files
        aDatabase		: default 'mghdb'
        shouldClean		: reload the data?
        useExtensions	: different databases will have different files ['atr','dat','hea']

    Returns:
        None
    '''
    import requests
    from bs4 import BeautifulSoup
    
    extensionRegex = ''
    for ext in useExtensions:
        if len(extensionRegex) > 1:
            extensionRegex += '|'
        extensionRegex = extensionRegex + '\.' + ext

    urlPhysionetDB = 'https://www.physionet.org/physiobank/database/' + aDatabase + '/'
    htmlDB = requests.get(urlPhysionetDB).content

    # Scrape the list of all data files out of database page
    scraper = BeautifulSoup(htmlDB, "lxml")
    hrefElements = [pageElement['href'] for pageElement in scraper.find_all('a', href=True)]
    dataElements = filter(lambda pageElement: re.search(extensionRegex, pageElement, re.I), hrefElements)
    dataElements = sorted(dataElements)
    downloadURLList = [urlPhysionetDB + dataLink for dataLink in dataElements]

    targetFileList = [os.path.join(aTargetDataDirectory, fileName) for fileName in dataElements]
    i = 0
    for dataURL, localDataFile in zip(downloadURLList, targetFileList):
        i += 1
        #		if i > 10*3:
        #			break

        if os.path.isfile(localDataFile) and not shouldClean:  # don't re-download
            continue

        print('downloading {0}'.format(localDataFile))
        with open(localDataFile, 'w+b') as localFileHandle:
            localFileHandle.write(requests.get(dataURL).content)

def download_annotation_metadata():
    '''
    Args:
        None

    Returns:
        DataFrame
    '''
    import requests
    from bs4 import BeautifulSoup
    import pandas as pd

    # Scrape annotation table frm MIT_-BIH info page
    annotationURL = 'https://www.physionet.org/physiobank/database/html/mitdbdir/intro.htm#annotations'
    htmlAnnotations = requests.get(annotationURL).content
    scraper = BeautifulSoup(htmlAnnotations, "lxml")
    tableElements = scraper.select('table')[-1]

    #
    metadata = pd.read_html(str(tableElements), header=0)[0]
    mask = metadata.Symbol.apply(lambda x: True)
    mask.ix[[20, 36]] = False
    metadata = metadata[mask]
    metadata['Symbol'] = metadata['Symbol'].astype(str)

    metadata.reset_index(drop=True, inplace=True)

    metadata.loc[0, 'Symbol'] = 'N'
    metadata.loc[37, 'Symbol'] = 'M'
    metadata.loc[38, 'Symbol'] = 'P'
    metadata.loc[39, 'Symbol'] = 'T'

    lut = {
        'artifact': ['artifact', 'Unclassified', 'Non-conducted', 'Fusion'],
        'arrythmia': ['flutter', 'bigeminy', 'tachycardia', 'fibrillation'],
        'other': ['bradycardia', 'Abberated', 'Premature', 'escape'],
        'signal': ['Signal quality', 'Extreme noise', 'Missed beat', 'Pause', 'Tape slippage']
    }
    def has_words(aTestString, aKeyList):
        for keyString in aKeyList:
            if not aTestString.find(keyString) == -1:
                # if keyString.lower() in aTestString.lower():
                return True
        return False
    for i in lut.keys():
        metadata[i] = metadata.Meaning.apply(lambda x: has_words(x, lut[i]))

    return metadata

In [5]:
def convert_physionet_data_to_csv(aDatabase, aSourceDirectory, aTargetDirectory, shouldClean=False):
    '''
    Convert raw data to CSV & TXT

    Args:
        aSourceDirectory :
        aTargetDirectory :

    Returns:
        None
    '''
    from subprocess import Popen, PIPE
    
    rawDataFiles = set()
    for dataFileName in os.listdir(aSourceDirectory):
        found = re.search('^(\d\d\d)\.', dataFileName)
        if found:
            rawDataFiles.add(found.group(1))

    conversionProcesses = set()
    targetSampleFile = aTargetDirectory + '/' + aDatabase + '.{name}.csv'
    convertSamples = 'rdsamp -r ' + aSourceDirectory + '/{name} -c -v -pe > {stdout}'
    targetAnnotationFile = aTargetDirectory + '/' + aDatabase + '.ann.{name}.txt'
    convertAnnotation = 'rdann -r ' + aSourceDirectory + '/{name} -a atr -v -e > {stdout}'

    maxOpenFiles = 6
    lowOpenFiles = 3
    numOpenFiles = 0

    for rawDataFile in rawDataFiles:

        targetSample = targetSampleFile.format(name=rawDataFile)
        if not os.path.isfile(targetSample) and not shouldClean:
            print(targetSample)
            sampleProcess = Popen(convertSamples.format(name=rawDataFile, stdout=targetSample), shell=True, stdout=PIPE)
            conversionProcesses.add(sampleProcess)
            numOpenFiles += 1

        targetAnnotation = targetAnnotationFile.format(name=rawDataFile)
        if not os.path.isfile(targetAnnotation) and not shouldClean:
            annotationProcess = Popen(convertAnnotation.format(name=rawDataFile, stdout=targetAnnotation), shell=True,
                                      stdout=PIPE)
            conversionProcesses.add(annotationProcess)
            numOpenFiles += 1

        if numOpenFiles > maxOpenFiles:
            print('Reached max processes {0} pending'.format(numOpenFiles))
            for conversionProcess in conversionProcesses:
                numOpenFiles -= 1
                if numOpenFiles == lowOpenFiles:
                    break
                conversionProcess.communicate()
            conversionProcesses.clear()
            numOpenFiles = 0

    for conversionProcess in conversionProcesses:
        conversionProcess.communicate()


In [6]:
def build_hdf5_data_store(aSourceDataDirectory, aTargetHDF5):
    '''
    writes all mitdb source directory csvs and txt annotations to a single hdf5 file

    Args:
        aSourceDataDirectory: full path of directory containing mitdb csvs and txt annotations
        aTargetHDF5: fullpath of target file

    Returns:
        None
    '''
    import pandas as pd

    allRawDataFiles = os.listdir(aSourceDataDirectory)

    sampleFiles = filter(lambda x: True if re.search('csv$', x) else False, allRawDataFiles)
    sampleFiles = [os.path.join(aSourceDataDirectory, f) for f in sampleFiles]
    sampleFiles = sorted(sampleFiles)

    sampleSetNames = [re.search('(\d\d\d)', f).group(1) for f in sampleFiles]

    annotationFiles = filter(lambda x: True if re.search('ann', x) else False, allRawDataFiles)
    annotationFiles = [os.path.join(aSourceDataDirectory, f) for f in annotationFiles]
    annotationFiles = sorted(annotationFiles)

    annotationMetadata = download_annotation_metadata()
    annotationMetadata.to_hdf(aTargetHDF5, 'Annotation_Metadata')

    def record_needs_update(aRecordName, aSampleFile, anAnnotationFile, aTargetHDF5):
        '''
        use an import data frame to keep track of the timestamps on the source files,
        Args:
        Returns:
            None
        '''
        import time
        checksum = '[{0}{1}]'.format(time.ctime(os.path.getmtime(aSampleFile)),
                                     time.ctime(os.path.getmtime(anAnnotationFile)))
        try:
            try:
                record = pd.read_hdf(aTargetHDF5, 'Import_checksums')
            except KeyError as e:
                record = pd.DataFrame({'record_name': aRecordName, 'checksum': checksum}, index=[0])
                pass
            else:
                checksum_ = record[record['record_name'] == aRecordName].checksum.tolist()[0]
                if checksum == checksum_:
                    return False

        except IndexError:
            record = record.append({'record_name': aRecordName, 'checksum': checksum}, ignore_index=True)
        else:
            record.ix[record.record_name == aRecordName, 'checksum'] = checksum

        record.to_hdf(aTargetHDF5, 'Import_checksums')
        return True
    def import_samples_and_annotations(aSampleCsvFile, anAnnotationTxtFile, aMetadataSet):
        from datetime import datetime
        
        def import_sample_data(aSampleFile):
            '''
            Args:
                aSampleFile :  CSV PHYSIONET data

            Returns:
                Pandas data frame
            '''

            dataframe = pd.read_csv(aSampleFile, skiprows=2)  # , encoding='utf-16', header=None)
            dataframe.columns = ['Elapsed_Microseconds', 'MLII_milliVolts', 'V5_milliVolts']
            dataframe.reset_index(drop=True, inplace=True)
            dataframe = dataframe.ix[1:]  # or, skip above
            dataframe.reset_index(drop=True, inplace=True)

            # Set data types
            dataframe.MLII_milliVolts = dataframe.MLII_milliVolts.astype(float)
            dataframe.V5_milliVolts = dataframe.V5_milliVolts.astype(float)

            # Change the time to a zero base and apply as the index of the data frame
            baseTime = datetime.strptime('00:00.00', '%M:%S.%f')
            dataframe.index = dataframe.Elapsed_Microseconds.apply(lambda x: datetime.strptime(x[1:-1], '%M:%S.%f') - baseTime)
            dataframe.drop('Elapsed_Microseconds', axis=1, inplace=True)

            return dataframe


        def import_annotation_data(anAnnotationFile):
            '''
            Args:
                anAnnotationFile :

            Returns:
                DataFrame
            '''
            dataframe = pd.read_table(anAnnotationFile, sep='\s\s+|\t| C')
            dataframe.columns = ['Elapsed_Microseconds', 'Sample_num', 'Type', 'Sub', 'Chan', 'Num', 'Aux']

            # Change the time to a zero base and apply as the index of the data frame
            baseTime = datetime.strptime('00:00.00', '%M:%S.%f')
            dataframe.index = dataframe.Elapsed_Microseconds.apply(lambda x: datetime.strptime(x, '%M:%S.%f') - baseTime)
            dataframe.drop('Elapsed_Microseconds', axis=1, inplace=True)

            dataframe.Sample_num = dataframe.Sample_num.astype(int)
            dataframe.Sub = dataframe.Sub.astype(int)
            dataframe.Chan = dataframe.Chan.astype(int)
            dataframe.Num = dataframe.Num.astype(int)

            return dataframe

        '''
        Args:
            aSampleCsvFile : MIT-BIH file
            anAnnotationTxtFile :

        Returns:
            Pandas data frame
        '''

        # load samples and annotations, then merge them using timestamp into a single frame
        sampleDataFrame = import_sample_data(aSampleCsvFile)
        annotationDataFrame = import_annotation_data(anAnnotationTxtFile)
        df = pd.concat([sampleDataFrame, annotationDataFrame], axis=1)

        #  Convert Type and Aux to integer values
        # Labels from MIT-BIH site
        arrythmiaSymbols = aMetadataSet[aMetadataSet.arrythmia].Symbol.tolist()
        normalSymbols = ['N', 'L', 'R']

        labels = ['arrythmia', 'normal']
        symbolSets = [arrythmiaSymbols, normalSymbols]
        annotationList = ['Type', 'Aux']

        for label, symbols in zip(labels, symbolSets):

            eventName = label + '_events'
            df[eventName] = 0

            for annotation in annotationList:
                df[eventName] = df.apply(lambda x: 1 if x[annotation] in symbols else x[eventName], axis=1)

        # add up each, just to see...
        print('calculating event occurances...')
        for label in labels:
            numEvents = len(df[df[label + '_events'] == 1].index)
            print('{0} {1} events'.format(numEvents, label))

        return df

    for sampleSet, sampleFile, annotationFile in zip(sampleSetNames, sampleFiles, annotationFiles):

        recordName = 'Record_' + sampleSet

        # skip if we've already imported this data
        if not record_needs_update(recordName, sampleFile, annotationFile, aTargetHDF5):
            continue

        print(recordName)
        combinedDataFrame = import_samples_and_annotations(sampleFile, annotationFile, annotationMetadata)
        combinedDataFrame.to_hdf(aTargetHDF5, recordName)



In [7]:
def load_all_data(aDbName, aDbDir, aStagingDir, anHdfDir, anHdfFilename):
    '''
    a big loader, all routines are incremental, so you can kill and restart

    Args:
        aMitDbDir 		: raw data goes in this directory
        aStagingDir 	: the raw data is converted to csv & txt here
        anHdfDir 		: where the HDF is stored
        anHdfFilename 	: fullpath (i know...) to HDF5 file

    Returns:
        DataFrame
    '''
    setup_data_directories(aDbDir, aStagingDir, anHdfDir)
    
    download_physionet_files(aDbName, aDbDir)
    convert_physionet_data_to_csv(aDbName, aDbDir, aStagingDir)
    build_hdf5_data_store(aStagingDir, anHdfFilename)



In [8]:
mitdb_dir = 'data/raw_data'
stage_dir = 'data/stage_data'
hdf_dir = 'data/hdf5'
hdf_filename = 'data/hdf5/mit-bih.hdf'

try:
    os.mkdir('data')
except:
    pass

load_all_data('mitdb', mitdb_dir, stage_dir, hdf_dir, hdf_filename)