# Preprocessing Notebook
This notebook prepares a flat dataset of GPS tracking points for further analysis.
* Data is loaded from .gpx files to a Spark SQL `DataFrame` using the custom `StravaLoader` class
* Nested columns are flattened 
* Additional attributes such as speed and accumulated distance are calculated
* Tracking points where the athlete is at rest are filtered out to better calculate pause times

## Input parameters
- `dataset` - The name of the dataset to load. Use "strava-activities" or "strava-activities-subset"
- `pause_threshold_minutes` - The minimum gap in minutes between two *activity_blocks*. If the time gap between to consecutive tracking points is greater than this limit the two points belong to different blocks
- `rest_speed_threshold_kmh[acticity_type]` - The upper limit for when to consider the athlete to be *at rest* for each `activity_type`. If the speed is lower than this limit the athlete is at rest and the tracking point will be dropped

In [54]:
dataset = 'strava-activities-subset'
pause_threshold_minutes = 10
rest_speed_threshold_kmh = {
    'Ride': 3
}

## Processing
Import libraries

In [55]:
from pyspark.sql import Window
from pyspark.sql.functions import coalesce, concat, lag, lit, lower, sum, unix_timestamp, max, min, \
                                  sin, cos, atan, sqrt, atan2, toRadians, round, avg, first
from classes import StravaLoader

Load data with `StravaLoader`

In [56]:
# Initialize Strava activity loader
sl = StravaLoader('s3', dataset, sc=sc, hiveContext=sqlContext)

# Load the dataset
df = sl.get_dataset()

Info: Using supplied SparkContext and HiveContext


#### Tracking point dataset
Compute time gap between consecutive points

In [57]:
# Partitioning on <athlete> and <activity_type>
window = Window.partitionBy('athlete', 'activity_type').orderBy('TIME_unix_time')

# Timestamp in seconds
df = df.withColumn( 
    'TIME_unix_time', 
    unix_timestamp(df['time'], "yyyy-MM-dd'T'HH:mm:ss'Z'")
)

Define function for calulation of speed

In [58]:
def calculate_speed(df, window):
    
    R = 6371000 # Earth radius
    
    # Time difference in seconds between tracking point and previous tracking point
    df = df.withColumn(
        'TIME_unix_time_diff',
        df['TIME_unix_time'] - lag('TIME_unix_time', count=1).over(window)
    )

    # Latitude and longditude in radians
    df = df.withColumn(
        'DIST_@latR',
        toRadians(df['@lat'])
    )
    df = df.withColumn(
        'DIST_@lonR',
        toRadians(df['@lon'])
    )
    
    # Latitude and longditude in previous tracking point
    df = df.withColumn(
        'DIST_@latR_prev',
        lag('DIST_@latR', count=1).over(window)
    )
    df = df.withColumn(
        'DIST_@lonR_prev',
        lag('DIST_@lonR', count=1).over(window)
    )
    
    # Difference in latitude and longditude since previous tracking point
    df = df.withColumn(
        'DIST_@latR_diff',
        coalesce(df['DIST_@latR'] - df['DIST_@latR_prev'], lit(0))
    )
    df = df.withColumn(
        'DIST_@lonR_diff',
        coalesce(df['DIST_@lonR'] - df['DIST_@lonR_prev'], lit(0))
    )

    # Havesine distance calculation between two tracking points
    df = df.withColumn(
        'DIST_a',
        sin(df['DIST_@latR_diff']/2) * sin(df['DIST_@latR_diff']/2)
        + cos(df['DIST_@latR']) * cos(df['DIST_@latR_prev'])
        * sin(df['DIST_@lonR_diff']/2) * sin(df['DIST_@lonR_diff']/2)
    )
    df = df.withColumn(
        'DIST_c',
        2 * atan2(sqrt(df['DIST_a']), sqrt(1 - df['DIST_a']))
    )
    
    # Distance between consecutive points
    df = df.withColumn(
        'DIST_diff_meters',
        coalesce(R * df['DIST_c'], lit(0))
    )
    
    # Momentary speed in km/h
    df = df.withColumn(
        'SPEED_kmh',
        coalesce(
            3.6 * df['DIST_diff_meters'] / (df['TIME_unix_time'] - lag('TIME_unix_time', count=1).over(window)),
            lit(0)
        )
    )
    
    return df

Calculate speed, filter points at rest. Remember to recalculate speed later.

In [59]:
df = calculate_speed(df, window)
df = df.filter((df['activity_type']=='Ride') & (df['SPEED_kmh']>rest_speed_threshold_kmh['Ride']))

Derive activity blocks by checking distance between consecutive tracking points

In [60]:
# Indicator (0,1) of whether time difference is greater than threshold (new activity block)
df = df.withColumn(
    'BLOCK_isnew',
    coalesce(
        (df['TIME_unix_time_diff'] >= pause_threshold_minutes * 60).cast('integer'),
        lit(0)
    )
)

# Sequence number of activity block per athlete and activity 
df = df.withColumn(
    'BLOCK_seqnum', 
    sum('BLOCK_isnew').over(window)
)

# Activity block id "<athlete>_<activity_type>_<integer>"
df = df.withColumn(
    'BLOCK_id',
    concat(
        df['athlete'],
        lit('_'),
        lower(df['activity_type']),
        lit('_'),
        df['BLOCK_seqnum'].cast('string')
    )
)

Recalculate speed, and compute activity block specific metrics, like accumulated distance within each block.

In [61]:
window_block = Window.partitionBy('BLOCK_id').orderBy('TIME_unix_time')

# Recalculate speed within each block
df = calculate_speed(df, window_block)

# Current total distance in km within the block
df = df.withColumn(
    'DIST_BLOCK_km',
    sum('DIST_diff_meters').over(window_block) / 1000
)

# Current time elapsed in seconds within the block
df = df.withColumn(
    'TIME_BLOCK_elapsed_seconds',
    df['TIME_unix_time'] - first(df['TIME_unix_time']).over(window_block)
)

Finally, flatten the `DataFrame` and remove redundant columns 

In [62]:
dff = df.select(
    df['athlete'].alias('athlete'), 
    df['activity_type'].alias('activity_type'),
    df['BLOCK_id'].alias('block_id'),
    df['time'].alias('p_timestamp'),
    df['@lat'].alias('p_latitude'), 
    df['@lon'].alias('p_longditude'), 
    df['ele'].alias('p_elevation'),
    df['SPEED_kmh'].alias('p_speed_kmh'),
    df['extensions.gpxtpx:TrackPointExtension.gpxtpx:atemp'].alias('p_temperature'), 
    df['extensions.gpxtpx:TrackPointExtension.gpxtpx:cad'].alias('p_cadence'), 
    df['extensions.gpxtpx:TrackPointExtension.gpxtpx:hr'].alias('p_heart_rate'), 
    df['DIST_diff_meters'].alias('p_diff_distance_meters'),
    df['TIME_unix_time_diff'].alias('p_diff_time_seconds'),
    df['DIST_BLOCK_km'].alias('c_cumlative_distance_km'),
    df['TIME_BLOCK_elapsed_seconds'].alias('c_elapsed_seconds')
)

Save flat dataset in parquet file format for later use

In [63]:
dff.write.mode('overwrite').parquet('s3n://larsbk/parquet/%s/' % dataset) # sl.path -> sl.root_path, sl.dataset

#### Activity block level aggregation dataset

Compute columns to use in calculations of weighted averages, like average heart rate $h_{avg}$

\begin{equation}
h_{avg} = \frac{1}{\sum_i{\Delta t_i}}\sum_{i}{\Delta t_i \cdot \frac{h_{i+1}+h_i}{2}}
\end{equation}

In [64]:
# Get heart rate from extensions struct to use lag function
df = df.withColumn(
    'HEART_RATE',
    df['extensions.gpxtpx:TrackPointExtension.gpxtpx:hr']
)

# Average heart rate of two consecutive tracking points
df = df.withColumn(
    'MIDPOINT_heart_rate',
    0.5 * (df['HEART_RATE']
    + lag(df['HEART_RATE'], count=1).over(window_block))
)

Compute all aggregations simultaneously

In [65]:
dfa = df.groupBy('block_id').agg(
    min(df['time']).alias('block_start_time'),
    max(df['DIST_BLOCK_km']).alias('block_total_distance_km'),
    (max(df['TIME_BLOCK_elapsed_seconds'])/60).alias('block_duration_minutes'),
    (max(df['DIST_BLOCK_km']) / (max(df['TIME_BLOCK_elapsed_seconds']) / 3600)).alias('block_avg_speed_kmh'),
    sum(df['TIME_unix_time_diff'] * df['MIDPOINT_heart_rate']) / max(df['TIME_unix_time_diff']).alias('block_avg_heart_rate') 
)

Save aggregated dataset for future use

In [66]:
dfa.write.mode('overwrite').parquet('s3n://larsbk/parquet/agg/%s/' % dataset)

In [72]:
dff.filter('c_elapsed_seconds<2').select(['p_timestamp', 'p_diff_time_seconds']).show()

+--------------------+-------------------+
|         p_timestamp|p_diff_time_seconds|
+--------------------+-------------------+
|2014-06-03T15:39:26Z|               null|
|2014-06-03T15:39:27Z|                  1|
|2014-02-08T15:27:10Z|               null|
|2014-02-08T15:18:59Z|               null|
+--------------------+-------------------+

