In [10]:
import csv
import fitparse
import gzip
import os
import pytz
import datetime

from tcxreader.tcxreader import TCXReader, TCXTrackPoint
from gpxcsv import gpxtolist
from tqdm import tqdm
from dateutil import parser

import functools

def debug(func):
    """Print the function signature and return value"""
    @functools.wraps(func)
    def wrapper_debug(*args, **kwargs):
        fn = args[0]
        try:
            value = func(*args, **kwargs)
        except Exception as e:
            print(f'Failed for file: {fn}')
            raise e
        return value
    return wrapper_debug

def get_paths():
    paths = {'raw':'../data/raw',
             'processed':'../data/processed',
             'metadata':'../data/metadata'}
    for _, folder in paths.items():
        if not os.path.isdir(folder):
            os.mkdir(folder)
    return paths

def standardize_time(timestamp):
    #expected format:
    #2012-07-30 17:54:08+00:00
    if timestamp is None:
        return timestamp
    if isinstance(timestamp, datetime.datetime):
        parsed_date = timestamp
        est_date = parsed_date
    else:
        parsed_date = parser.parse(timestamp)
        if not parsed_date.tzinfo:
            parsed_date = parsed_date.replace(tzinfo=pytz.utc)
        est = pytz.timezone('US/Eastern')
        est_date = parsed_date.astimezone(est)
    output = est_date.strftime('%Y-%m-%d %H:%M:%S')
    return output

def get_expected_format():
    return ('time','distance','lat','lon','elev','power','cadence','heart_rate')

def parse_gzip(filename):
    raw = get_paths()['raw']
    with gzip.open(f'{raw}/{filename}', 'rb') as f:
        file_content = f.read()
    return file_content

def write_output(filename, output, track_name):
    paths = get_paths()
    output_filename = filename.split('.')[0]
    processed = paths['processed']
    with open(f'{processed}/{output_filename}.csv','w',newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',',quotechar='"', quoting=csv.QUOTE_MINIMAL)
        writer.writerows(output)

    metadata = paths['metadata']
    with open(f'{metadata}/processed_files.csv', 'a', newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        writer.writerow([filename, output_filename, track_name])

@debug
def parse_gpx(filename,write_output=False):
    raw = get_paths()['raw']
    gpx_list = gpxtolist(f'{raw}/{filename}')
    track_name = None
    output = []
    for point in gpx_list:
        if not track_name:
            track_name = point.get('name')
        assert get_expected_format() == ('time','distance','lat','lon','elev','power','cadence','heart_rate')
        output.append([standardize_time(point.get('time')), 
                       point.get('distance'),
                       point.get('lat'),
                       point.get('lon'),
                       point.get('ele'),
                       point.get('power'),
                       point.get('cad'),
                       point.get('heart_rate')])
    if write_output:
        write_output(filename, output, track_name)
    return output

@debug
def parse_fit(filename, content=None, write_output=False):
    raw = get_paths()['raw']
    fitfile = fitparse.FitFile(f'{raw}/{filename}')
    output = []
    track_name = filename.split('.')[0]
    for record in fitfile.get_messages("record"):
        row_output = {}
        for data in record:
            use = False
            if data.name in ['position_lat', 'position_long']:
                value = data.value
                if value:
                    # Answer here https://gis.stackexchange.com/questions/371656/garmin-fit-coordinate-system
                    # Answer why 11930465 here: https://gis.stackexchange.com/questions/122186/convert-garmin-or-iphone-weird-gps-coordinates
                    value /= 11930465
                    use = True
            elif data.name == 'timestamp':
                value = standardize_time(data.value)
                use = True
            elif data.name in ['distance','power', 'cadence', 'heart_rate']:
                value = data.value
                use = True
            elif data.name == 'altitude':
                if data.value:
                    if data.units and data.units == 'm':
                        value = data.value * 3.28084
                    else:
                        value = data.value
                    use = True
            if use:
                row_output[data.name] = value
        assert get_expected_format() == ('time','distance','lat','lon','elev','power','cadence','heart_rate')
        final_row_output = [row_output['timestamp'],
                            row_output.get('distance', None),
                            row_output.get('position_lat', None),
                            row_output.get('position_long', None),
                            row_output.get('altitude', None),
                            row_output.get('power', None),
                            row_output.get('cadence', None),
                            row_output.get('heart_rate', None)]
        output.append(final_row_output)
    if write_output:
        write_output(filename, output, track_name)
    return output

#Some of the TCX files have extra spaces at the beginning of each line - this will fix those
def preprocess_tcx(filename):
    raw = get_paths()['raw']
    with open(f'{raw}/{filename}','r') as f:
        line_lst = [line.lstrip() for line in f.readlines()]
        lines = ''.join(line_lst)
    with open(f'{raw}/{filename}','w') as f:
        f.write(lines)

@debug
def parse_tcx(filename,write_output=False):
    preprocess_tcx(filename)
    tcx_reader = TCXReader()
    
    raw = get_paths()['raw']
    data = tcx_reader.read(f'{raw}/{filename}', only_gps=False)
    track_name = filename.split('.')[0]
    output = []
    for trackpoint in data.trackpoints:
        assert get_expected_format() == ('time','distance','lat','lon','elev','power','cadence','heart_rate')
        tpd = trackpoint.to_dict()
        final_row_output = [standardize_time(tpd.get('time')), 
                            tpd.get('distance'),
                            tpd.get('latitude'),
                            tpd.get('longitude'),
                            tpd.get('elevation'),
                            tpd.get('Watts'),
                            tpd.get('cadence'),
                            tpd.get('hr_value')]
        output.append(final_row_output)
    if write_output:
        write_output(filename, output, track_name)
    return output

def unzip_file(filename):
    import shutil
    
    raw = get_paths()['raw']
    #filename with .gz
    original_filename = filename
    #filename without .gz
    new_filename = filename[:-3]
    with gzip.open(f'{raw}/{original_filename}', 'rb') as f_in:
        with open(f'{raw}/{new_filename}', 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    return new_filename

def parse_file(filename, write_output=False):
    if filename[-3:] == 'gpx':
        _ = parse_gpx(filename,write_output=write_output)
        return True, filename
    elif filename[-3:] == 'fit':
        _ = parse_fit(filename,write_output=write_output)
        return True, filename
    elif filename[-3:] == 'tcx':
        _ = parse_tcx(filename,write_output=write_output)
        return True, filename
    elif filename[-2:] == 'gz':
        filename = unzip_file(filename)
        return parse_file(filename,write_output=write_output)
    else:
        return False, filename

from tqdm import tqdm

def parse_all_files():
    raw = get_paths()['raw']
    all_files = os.listdir(raw)
    failed = []
    for file in tqdm(all_files):
        success, name = parse_file(file)
        if not success:
            failed.append(name)
    return failed

In [21]:
parse_all_files()

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 3387/3387 [16:06<00:00,  3.50it/s]


[]

In [2]:
raw = get_paths()['raw']
all_files = os.listdir(raw)

In [4]:
all_files[:5]

['10002481365.fit',
 '10002481365.fit.gz',
 '10010559674.fit',
 '10010559674.fit.gz',
 '10022163615.tcx']

In [11]:
output = parse_fit('10002481365.fit',write_output=False)

10002481365


In [9]:
from tcxreader.tcxreader import TCXReader, TCXTrackPoint

tcx_reader = TCXReader()
filename = '9392651436.tcx'

data = tcx_reader.read(f'../data/raw/{filename}', only_gps=False)

In [13]:
data.trackpoints[0].to_dict()

{'time': datetime.datetime(2023, 3, 21, 11, 20, 49),
 'longitude': None,
 'latitude': None,
 'distance': 9.19,
 'elevation': None,
 'hr_value': 97,
 'cadence': 84,
 'Speed': 9.04,
 'Watts': 173.0,
 'Resistance': 173.0}

In [27]:
data.name

AttributeError: 'TCXExercise' object has no attribute 'name'

In [6]:
from gpxcsv import gpxtolist

filename = 'Lunch_Ride.gpx'
parse_gpx(filename)

NameError: name 'parse_gpx' is not defined

In [28]:
import datetime

gpx_time = '2023-09-13T16:38:54Z'
tcx_time = datetime.datetime(2023, 3, 21, 11, 20, 49)
fit_time = '2023-02-05 23:59:12'

In [43]:
from dateutil import parser
import pytz
import datetime

def standardize_time(timestamp):
    #expected format:
    #2012-07-30 17:54:08+00:00
    if isinstance(timestamp, datetime.datetime):
        parsed_date = timestamp
        est_date = parsed_date
    else:
        parsed_date = parser.parse(timestamp)
        if not parsed_date.tzinfo:
            parsed_date = parsed_date.replace(tzinfo=pytz.utc)
        est = pytz.timezone('US/Eastern')
        est_date = parsed_date.astimezone(est)
    output = est_date.strftime('%Y-%m-%d %H:%M:%S')
    return output
    

In [44]:
standardize_time(fit_time)

'2023-02-05 18:59:12'