In [1]:
%pip install findspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
import findspark
findspark.init()
import pyspark

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Spark Demo').master('local[*]').getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/05 11:32:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
import fitdecode
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os
import warnings
warnings.filterwarnings("ignore")

# spark libs
import functools
from functools import reduce
from pyspark.sql.types import *
from pyspark.sql import DataFrame
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.functions import isnan, when, count, col, row_number
from pyspark.sql.window import Window

In [5]:
def DecodeFit_DataFrame(file_path: str, frame_name: str = 'record', lat_long_update: bool = True, debug: bool = False) -> pd.DataFrame:
    """_summary_

    Args:
        file_path (str): path of raw fit file
        frame_name (str, optional): _description_. Defaults to 'record'.
        lat_long_update (bool, optional): _description_. Defaults to True.
        debug (bool, optional): _description_. Defaults to False.

    Returns:
        pd.DataFrame: processed dataframe from source fit file
    """
    # Initialize some useful variables for the loops
    check_list = good_list = []
    list_check = {}
    df_activity = pd.DataFrame([])

    # Open the file with fitdecode
    with fitdecode.FitReader(file_path) as file:
        
        # Iterate through the .FIT frames
        for frame in file:
            
            # Procede if the frame object is the correct data type
            if isinstance(frame, fitdecode.records.FitDataMessage):
                
                # Add the frames and their corresponding counts to a dictionary for debugging
                if frame.name not in check_list:
                    check_list.append(frame.name)
                    list_check[frame.name] = 1
                else:
                    list_check.update({frame.name: list_check.get(frame.name) + 1})
                
                # If the current frame is a record, we'll reset the row_dict variable
                # and add the field values for all fields in the good_list variable
                if frame.name == frame_name:
                    row_dict = {}
                    for field in frame.fields: 
                        if field.name.find('unknown') < 0:
                            if field.name not in good_list and field.name.find('unknown') < 0:
                                good_list.append(field.name)
                            row_dict[field.name] = frame.get_value(field.name)
                    
                    # Append this row's dictionary to the main dataframe
                    df_activity = pd.concat([df_activity, pd.DataFrame([row_dict])], ignore_index = True)
        
        # Update the Long/Lat columns to standard degrees
        if lat_long_update:
            for column in ['position_lat', 'position_long']:
                df_activity[column] = df_activity[column].apply(lambda x: x / ((2**32)/360))
        
        # If you want to check to see which frames are in the file, print the list_check variable
        if debug:
            print(list_check)

    return df_activity

In [6]:
directory_path = '/Users/mateo.wheeler/Library/Mobile Documents/com~apple~CloudDocs/Documents/Documents - HF Mac/py-projects/coros-run-metrics/fit_files'
file_list = []

for filename in os.listdir(directory_path):
    file_path = os.path.join(directory_path, filename)
    if os.path.isfile(file_path):
        filename = filename[:-4]
        file_list.append(filename)

print(file_list)

df_activity_combined = pd.DataFrame([])
for file in file_list:


    df_activity = DecodeFit_DataFrame(f'{directory_path}/{file}.fit', frame_name = 'record')

    df_activity['elapsed_time'] = df_activity['timestamp'].apply(lambda x: x - df_activity.loc[0, 'timestamp'])
    df_activity['distance_miles'] = df_activity['distance']*0.000621371192
    df_activity['altitude_feet'] = df_activity['altitude']*3.28084
    df_activity['pace_minutes_per_mile'] = 26.8224/df_activity['speed']
    df_activity['timestamp'] = pd.to_datetime(df_activity['timestamp'])
    df_activity['run'] = file

    df_activity_combined = pd.concat([df_activity_combined, df_activity], ignore_index = True)

    print(f"Processing '{file}' complete")

print(f"Combined run event pandas dataframe has {len(df_activity_combined)} records")

['LakewoodTrailRun20241128074748', 'BoulderTrailRun20241126062604', 'BoulderTrailRun20241124062708', 'BoulderTrailRun20241205063840', 'BoulderTrailRun20241122063002']
Processing 'LakewoodTrailRun20241128074748' complete
Processing 'BoulderTrailRun20241126062604' complete
Processing 'BoulderTrailRun20241124062708' complete
Processing 'BoulderTrailRun20241205063840' complete
Processing 'BoulderTrailRun20241122063002' complete
Combined run event pandas dataframe has 21543 records


In [13]:
df_run_events = spark.createDataFrame(df_activity_combined)
df_run_events = df_run_events.replace(float('inf'), None)\
.fillna(0)

df_ts_agg = df_run_events.groupby('run').agg(min('timestamp').alias('min_ts'), max('timestamp').alias('max_ts'))

df_ts_agg = df_ts_agg.withColumn('run_time_seconds',col("max_ts").cast("long") - col('min_ts').cast("long"))\
.withColumn('run_time_minutes',round(col('run_time_seconds')/60))

df_ts_agg.show()

+--------------------+-------------------+-------------------+----------------+----------------+
|                 run|             min_ts|             max_ts|run_time_seconds|run_time_minutes|
+--------------------+-------------------+-------------------+----------------+----------------+
|LakewoodTrailRun2...|2024-11-28 07:47:48|2024-11-28 08:55:04|            4036|            67.0|
|BoulderTrailRun20...|2024-11-26 06:26:04|2024-11-26 07:22:09|            3365|            56.0|
|BoulderTrailRun20...|2024-11-24 06:27:08|2024-11-24 08:34:17|            7629|           127.0|
|BoulderTrailRun20...|2024-12-05 06:38:40|2024-12-05 07:44:51|            3971|            66.0|
|BoulderTrailRun20...|2024-11-22 06:30:02|2024-11-22 07:28:54|            3532|            59.0|
+--------------------+-------------------+-------------------+----------------+----------------+



In [14]:
df_run_events2 = df_run_events.join(df_ts_agg, ['run'], 'left')
df_run_events2.dtypes

[('run', 'string'),
 ('activity_type', 'string'),
 ('timestamp', 'timestamp'),
 ('distance', 'double'),
 ('enhanced_speed', 'double'),
 ('speed', 'double'),
 ('Effort Pace', 'double'),
 ('enhanced_altitude', 'double'),
 ('altitude', 'double'),
 ('step_length', 'double'),
 ('cadence', 'double'),
 ('heart_rate', 'double'),
 ('power', 'double'),
 ('accumulated_power', 'double'),
 ('position_lat', 'double'),
 ('position_long', 'double'),
 ('elapsed_time', 'interval day to second'),
 ('distance_miles', 'double'),
 ('altitude_feet', 'double'),
 ('pace_minutes_per_mile', 'double'),
 ('min_ts', 'timestamp'),
 ('max_ts', 'timestamp'),
 ('run_time_seconds', 'bigint'),
 ('run_time_minutes', 'double')]

In [15]:
df_run_events2.select('pace_minutes_per_mile').show()

+---------------------+
|pace_minutes_per_mile|
+---------------------+
|                  0.0|
|                  0.0|
|                  0.0|
|                  0.0|
|                  0.0|
|                  0.0|
|                  0.0|
|                  0.0|
|                  0.0|
|    18.05006729475101|
|    9.015932773109244|
|    9.092338983050846|
|    9.160655737704918|
|    9.188900308324769|
|    9.214153211954654|
|    9.258681394546082|
|    9.303642039542144|
|    9.391596638655463|
|     9.47453196750265|
|    9.541942369263607|
+---------------------+
only showing top 20 rows



Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/Users/mateo.wheeler/Library/Mobile Documents/com~apple~CloudDocs/Documents/Documents - HF Mac/py-projects/coros-run-metrics/coros/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 193, in manager
BrokenPipeError: [Errno 32] Broken pipe


In [29]:
df_run_events2.groupby('activity_type','run','run_time_minutes').agg(max('distance_miles'), 
                                avg('pace_minutes_per_mile'), 
                                *[expr(f"approx_percentile(pace_minutes_per_mile, {q}) as approx_best_pace") for q in [0.10]],
                                avg('power'),
                                max('power'),
                                avg('cadence'),
                                max('cadence'),
                                avg('heart_rate'),
                                max('heart_rate')
                                ).show()

+-------------+--------------------+----------------+-------------------+--------------------------+-----------------+------------------+----------+-----------------+------------+------------------+---------------+
|activity_type|                 run|run_time_minutes|max(distance_miles)|avg(pace_minutes_per_mile)| approx_best_pace|        avg(power)|max(power)|     avg(cadence)|max(cadence)|   avg(heart_rate)|max(heart_rate)|
+-------------+--------------------+----------------+-------------------+--------------------------+-----------------+------------------+----------+-----------------+------------+------------------+---------------+
|      running|LakewoodTrailRun2...|            67.0|  6.802237430790879|         9.849714544181122|6.677221807318894|220.38679490512087|     471.0|83.18377956849493|       124.0|152.87210813621004|          192.0|
|      running|BoulderTrailRun20...|            56.0|   6.19372862246528|          9.27750633259695|           6.7056|217.53021560886728|   