In [None]:
import pandas as pd
import numpy as np
from fitparse import FitFile
import operator
import re
from functools import lru_cache

In [None]:
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.colors
from jupyter_dash import JupyterDash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State, ALL
import folium

In [None]:
MAX_HR = 180
DEFAULT_ZONES = {0:'rgb(204,204,204)', 50:'rgb(194,202,202)', 60:'rgb(70,199,238)', 70:'rgb(106,204,44)', 80:'rgb(249,191,29)', 90:'rgb(222,16,91)'}

zone_data = list(zip(DEFAULT_ZONES.keys(),DEFAULT_ZONES.values()))
max_hr = MAX_HR

In [None]:
@lru_cache(maxsize=10)
def rgb2hex(rgb):
    rgb = re.findall('([0-9]+)(?:\.[0-9])*', rgb)
    rgb = [hex(int(x)) for x in rgb]
    r, g, b = [str(x)[2:] for x in rgb]
    return "#{}{}{}".format(r.zfill(2), g.zfill(2), b.zfill(2))

In [None]:
def get_continuous_color(intermed):
    global zone_data
    intermed = intermed/100.0
    z = [(x,y) for x,y in zone_data if y !='']
    zone_n = [x[0]/100.0 for x in z] + [1.0]
    zone_c = [x[1] for x in z] 
    zone_c.append(zone_c[-1])
    colorscale = plotly.colors.make_colorscale(zone_c, scale=zone_n)

    if len(colorscale) < 1:
        raise ValueError("colorscale must have at least one color")

    if intermed <= 0 or len(colorscale) == 1:
        return colorscale[0][1]
    if intermed >= 1:
        return colorscale[-1][1]

    for cutoff, color in colorscale:
        if intermed > cutoff:
            low_cutoff, low_color = cutoff, color
        else:
            high_cutoff, high_color = cutoff, color
            break

    return plotly.colors.find_intermediate_color(
        lowcolor=low_color, highcolor=high_color,
        intermed=((intermed - low_cutoff) / (high_cutoff - low_cutoff)),
        colortype="rgb")

In [None]:
def bin_info():
    global zone_data
    global max_hr
    zone_n = [x[0] for x in zone_data]
    bin_values = [max_hr*z/100 for z in zone_n] + [np.inf]
    bin_names = ["{}%-{}%".format(a,b) if b != np.inf else "{}%+".format(a) for (a,b) in list(zip(zone_n, zone_n[1:]+[np.inf]))]
    return(bin_values, bin_names)

In [None]:
def zone_info():
    global zone_data
    global max_hr
    zone_n = [x[0] for x in zone_data]
    zone_color = [get_continuous_color(n) for n in zone_n]
    return(zone_n, zone_color)

In [None]:
def get_session_time(file_path):
    """A function to return the `time_created` field from a FIT file
    
    Parameters
    ----------
    file_path : str
        The file location of the FIT file

    Returns
    -------
    datetime
        <class 'datetime.datetime'> representation of when the FIT file was created on
    """
    with FitFile(open(file_path, 'rb')) as fitfile:
        fitfile._parse_message() # parse first message as header
        return fitfile._parse_message().get_value('time_created') # parse second message and look for `time_created`
    return None

In [None]:
def get_sessions():
    """A function to return a list of FIT sessions in the data directory with its creation time.

    Returns
    -------
    list
        A list of tuples with the file path as first element and its creation time as the second element.
    """
    datafiles = !ls data/*.fit
    sessions = []
    for filepath in datafiles:
        sessions.append((filepath,get_session_time(filepath)))
    return sessions

In [None]:
USE_COLUMNS = ["timestamp","distance","enhanced_speed","heart_rate","cadence","position_lat","position_long","enhanced_altitude"]
RENAME_COLUMNS = {"enhanced_speed": "speed", "enhanced_altitude": "altitude"}

def load_session_data(file_path):
    """A function to return the `time_created` field from a FIT file
    
    Some columns are dropped and renamed. Only keep rows where heart_rate data exists.
    Resample to 1 second frequency with interpolated values.
    
    Parameters
    ----------
    file_path : str
        The file location of the FIT file

    Returns
    -------
    dataframe
        Pandas DataFrame of the FIT file
    """
    with FitFile(open(file_path, 'rb')) as fitfile:
        df = pd.DataFrame([record.get_values() for record in fitfile.get_messages('record')])
        df = df.loc[:,df.columns.isin(USE_COLUMNS)]
        for c in USE_COLUMNS:
            if c not in df.columns:
                df[c] = np.nan 
        df.rename(columns=RENAME_COLUMNS, inplace=True)
        df = df.dropna(subset=['heart_rate'])
        df.set_index('timestamp', inplace=True)
        df = df.resample('s').interpolate()
        df.heart_rate = df.heart_rate.rolling('20s').mean()
        bin_values, bin_names = bin_info()
        _, zone_colors = zone_info()
        df['hr_binned'] = pd.cut(df.heart_rate, bin_values, labels=bin_names)
        df['hr_zone_color'] = pd.cut(df.heart_rate, bin_values, labels=zone_colors)
        return df    

In [None]:
def figure_for_heart_rate_timeseries(df):
    """A function to return figure for the heart rate time series
        
    Parameters
    ----------
    df
        The Pandas DataFrame used.  Columns required: timestamp as index, heart_rate, hr_zone_color

    Returns
    -------
    figure
        Plotly figure
    """
    fig = go.Figure()
    for color, hrzc_df in df.groupby('hr_zone_color'):
        if len(hrzc_df) == 0:
            continue
        fig.add_trace(go.Bar(x=hrzc_df.index,
                           y=hrzc_df.heart_rate,
                           name=hrzc_df.iloc[0].hr_binned,
                           marker={'color': hrzc_df.hr_zone_color, 'line.width':0}))

    fig.add_trace(go.Scatter(x=df.index, y=df.heart_rate,
                             line=dict(color='darkgrey', width=1), showlegend=False))

    fig.update_layout(
        title = 'Heart Rate Zones with 20s Rolling Mean',
        xaxis = dict(title='Date and Time'),
        yaxis = dict(title='Heart Rate (bpm)'),
        bargap=0
    )
    return fig

In [None]:
def figure_for_heart_rate_distribution(df):
    """A function to return figure for the heart rate zone distribution
        
    Parameters
    ----------
    df
        The Pandas DataFrame used.  Columns required: timestamp as index, heart_rate, hr_zone_color

    Returns
    -------
    figure
        Plotly figure
    """
    df = df.hr_binned.value_counts(sort=False)/60
    fig = go.Figure(go.Bar(
                x=df, y=df.index, orientation='h',
                marker_color=list(zone_info()[1])
    ))

    fig.update_layout(
        title = 'Heart Rate Zones Distribution',
        xaxis = dict(title='Time in Zone (minutes)'),
        yaxis = dict(title='Heart Rate Zone'),
    )
    return fig

In [None]:
def figure_for_heart_rate_facet(df):
    """A function to return figure for the heart rate facet charts
        
    Parameters
    ----------
    df
        The Pandas DataFrame used.  Columns required: timestamp as index, heart_rate, hr_zone_color

    Returns
    -------
    figure
        Plotly figure
    """
    df = df.drop(columns=['distance','position_lat','position_long']).reset_index().set_index(['timestamp','heart_rate','hr_binned','hr_zone_color']) \
        .stack(dropna=False).reset_index().rename(columns={'level_4':'data',0:'value'}).set_index('timestamp')
    
    colormap = {k:v for k,v in zip(bin_info()[1],zone_info()[1])}
    
    fig = px.scatter(df, x=df.index, y=df.value, color=df.hr_binned, facet_row=df.data, color_discrete_map=colormap)
    fig.update_yaxes(matches=None)
    fig.update_layout(
        xaxis = dict(title='Date and Time'),
        bargap=0
    )
    fig.update_yaxes(title=None)
    fig.update_traces(marker=dict(size=3))
    return fig

In [None]:
def generate_map(df):
    df=df.copy()
    df["position_lat_degrees"] = df["position_lat"] * ( 180 / 2**31 )
    df["position_long_degrees"] = df["position_long"] * ( 180 / 2**31 )
    df["color"] = df["hr_zone_color"].apply(rgb2hex)
    df = df.dropna(subset=['position_long_degrees','position_lat_degrees'])
    m=folium.Map()
    m.fit_bounds([df[['position_lat_degrees', 'position_long_degrees']].min().values.tolist(), df[['position_lat_degrees', 'position_long_degrees']].max().values.tolist()]) 
    folium.Marker([df["position_lat_degrees"].iloc[0],df["position_long_degrees"].iloc[0]], 
                  popup="Start").add_to(m)
    folium.Marker([df["position_lat_degrees"].iloc[-1],df["position_long_degrees"].iloc[-1]], 
                  popup="Stop").add_to(m)

    for point, color, binned in zip(zip(df["position_lat_degrees"],df["position_long_degrees"]), df["color"], df["hr_binned"]):
        folium.CircleMarker(location=point,
                            radius=2,
                            popup=binned,
                            color=color).add_to(m)

    m.save('temp.html')

In [None]:
sessions = get_sessions()
app = JupyterDash(__name__)

rslider = dcc.RangeSlider(
    id='zone_slider',
    min=30,
    max=100,
    value=[],
    pushable=1,
    marks={x:'{}%'.format(x) if x % 10 == 0 else '' for x in range(30,101,5)},
    included=False,
)

app.layout = html.Div([
    html.H1("Heart Rate Dashboard"),
    html.Label([
        "Workout Sessions",
        dcc.Dropdown(
            id='sessions_dropdown', clearable=False,
            value = sessions[-1][0],
            options=[
                {'label': d, 'value': f}
                for f,d in sessions
            ],
            style={'width': '200px'}
        ),
    ]),
    html.Div(id='output_text', children="Session"),
    html.Br(),
    html.Hr(),
    html.Br(),
    html.Div(id='max_hr_display', children="Your maximum heart rate: 180"),
    dcc.Slider(
        id='max_heart_rate',
        min=130,
        max=210,
        step=1,
        value=MAX_HR,
        marks={x:'{} bpm'.format(x) if x % 10 == 0 else '' for x in range(130,211,5)},
        included=False,
    ),
    html.Br(),
    html.Hr(),
    html.Br(),
    html.Label([
        "Number of heart rate zones: ",
        dcc.Dropdown(
            id='num_zones',
            options=[{'label':x, 'value':x} for x in range(2,9)],
            value=6,
        ),
    ]),
    html.Div(id="zone_slider_container", children=""),
    html.Div(id="zone_color_container", children=[]),
    html.Button("Update Zone Settings",
                id='save_config'),
    html.Br(),
    html.Hr(),
    html.Br(),
    html.Iframe(id="map_frame"),
    dcc.Graph(
        id='heart_rate_timeseries',
        figure=go.Figure()
    ),
    dcc.Graph(
        id='heart_rate_distribution',
        figure=go.Figure()
    ),
    dcc.Graph(
        id='heart_rate_facet',
        figure=go.Figure()
    ),
])

@app.callback(
    Output('zone_slider_container', 'children'),
    Output('zone_color_container', 'children'),
    Input('num_zones', 'value'),
    state=[State('zone_color_container', 'children'),],
)
def update_zones_count(num_zones, color_container_children):
    global zone_data

    zone_n = [x[0] for x in zone_data]
    zone_c = [x[1] for x in zone_data]
    zone_n = (zone_n[:num_zones] + num_zones * [100])[:num_zones]
    zone_n = [min(x,100+ind-num_zones) for ind, x in enumerate(zone_n)]
    zone_c = (zone_c[:num_zones] + num_zones * [''])[:num_zones]
    zone_n[0] = 0
    if zone_c[-1] == '':
        zone_c[-1] = 'rgb(222,16,91)'
    zone_data = list(zip(zone_n,zone_c))
    rslider.value = zone_n[1:]
    color_container_children = color_container_children[:num_zones]
    for i in range(len(color_container_children), num_zones):
        color_container_children.append(html.Div(id={'type': 'zones_info','index': i}, children=[
            dcc.Input(
                id={'type': 'zone_color_input','index': i},
                type="text",
                value=zone_c[i],
            ),
        ]),)
    
    return (rslider,
            color_container_children,
           )

@app.callback(
    Output('max_hr_display', 'children'),
    Input("max_heart_rate", "value"),
)
def update_zone_info(max_hr):
    return "Your maximum heart rate: {}".format(max_hr)


@app.callback(
    Output('output_text', 'children'),
    Output('heart_rate_timeseries', 'figure'),
    Output('heart_rate_distribution', 'figure'),
    Output('heart_rate_facet', 'figure'),
    Output('map_frame','srcDoc'),
    Input("sessions_dropdown", "value"),
    Input("save_config", "n_clicks"),
    Input("max_heart_rate", "value"),
    state=[
        State({'type': 'zone_color_input', 'index': ALL}, 'value'),
        State('zone_slider_container', 'children'),
    ],
)
def change_session(session, n_clicks, hr, zone_color_input, zone_slider):
    global max_hr, zone_data
    max_hr = hr
    if zone_color_input != []:
        zone_data = list(zip([0] + zone_slider['props']['value'],zone_color_input))
    df = load_session_data(session)
    start = df.index.min()
    end = df.index.max()
    output_text__children = "Session from {} to {}, duration: {}".format(start,end,end-start)
    heart_rate_timeseries__figure = figure_for_heart_rate_timeseries(df)
    generate_map(df)
    return (output_text__children,
            figure_for_heart_rate_timeseries(df),
            figure_for_heart_rate_distribution(df),
            figure_for_heart_rate_facet(df),
            open('temp.html', 'r').read(),
           )

# Run app and display result inline in the notebook
app.run_server(mode='external')

In [None]:
# for s in get_sessions():
# #     if s[0] == "data/3042255479.fit":
#     df = load_session_data(s[0])
#     display(df.head())
#     break