<a href="https://colab.research.google.com/github/nuentsa/cloud-data-tools/blob/main/runtastic_data/extract_and_load_adidas_running_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [54]:
from google.colab import userdata
# Global attributes
# We will retrieve these specific top-level fields from each sport session file
top_level_fields = [
    "version", "duration", "pause", "calories", "dehydration_volume",
    "start_time_timezone_offset", "end_time_timezone_offset", "start_time",
    "end_time", "created_at", "updated_at", "id", "sport_type_id"
]
# Each sport session has a list of features. We will load only the following ones
features_to_load = ["weather", "map", "track_metrics", "initial_values"]
supported_feature_attributes = ["temperature", "wind_speed", "wind_direction",
                                "humidity", "start_latitude", "start_longitude",
                                "distance", "average_speed", "average_pace",
                                "max_speed", "elevation_gain", "elevation_loss", "distance", "duration"]
# We will save and unzip  the  runtastic file here.
work_dir = "/home/runtastic/"

# Extract the connection string from the notebook secrets
conn_string = userdata.get('postgres_conn_string')

# This File ID is the identifier of your zip file stored in Google Drive
# Oonce you store the zip file in Google drive, retrieve its identifier to store it as secrets in Google Colab
# It should look like this laggVyWshwcyP6kEI-y_W3P8D26sz
runtastic_file_id = userdata.get('runtastic_file_id') # TODO add this file id in a secret

In [None]:
# @title Connect to Postgres DB and Create Table to Load Extracted Data
# A database connection is needed to load all data extracted from runtastic files
# We also create all required tables if not already exist

import psycopg2
from google.colab import userdata
import psycopg2
conn_string = userdata.get('postgres_conn_string')
try:
    conn = psycopg2.connect(conn_string)
    cur = conn.cursor()
    print("Successfully connected to a postgres instance and database")
except Exception as e:
    raise ValueError(f"Database connection failed: {e}")

# Store all sport sessions loaded from the zip file
sessions_table = "sessions"
# Store all fastest loaded from each sport session
fastest_segments_table = "fastest_segments"

# Create the session table if not exist

create_table_sql = """
DROP TABLE sport_type CASCADE;
CREATE TABLE IF NOT EXISTS sport_type (
  id INT PRIMARY KEY,
  activity VARCHAR(255)
);
DROP TABLE sessions;
CREATE TABLE IF NOT EXISTS sessions (
  version FLOAT,
  duration FLOAT,
  pause FLOAT,
  calories FLOAT,
  dehydration_volume FLOAT,
  start_time_timezone_offset FLOAT,
  end_time_timezone_offset FLOAT,
  start_time TIMESTAMP WITH TIME ZONE,
  end_time TIMESTAMP WITH TIME ZONE,
  created_at TIMESTAMP WITH TIME ZONE,
  updated_at TIMESTAMP WITH TIME ZONE,
  id VARCHAR(255) PRIMARY KEY,
  sport_type_id INT REFERENCES sport_type(id),
  temperature FLOAT,
  wind_speed FLOAT,
  wind_direction FLOAT,
  humidity FLOAT,
  start_latitude FLOAT,
  start_longitude FLOAT,
  distance FLOAT,
  average_speed FLOAT,
  average_pace FLOAT,
  max_speed FLOAT,
  elevation_gain FLOAT,
  elevation_loss FLOAT
);
"""

# TODO Create the table to store the fastest_segments
try:
    # Execute the CREATE TABLE statement
    cur.execute(create_table_sql)
    # Commit the changes and close the connection
    conn.commit()
    cur.close()
    conn.close()
    print ("All Tables created successfully")
except Exception as e:
    raise ValueError(f"failed to create the sessions table: {e}")




In [None]:
# @title Save in Postgres the static list of Runtastic Sport Types
sport_type_sql = """
INSERT INTO sport_type (id, activity)
SELECT *
FROM (
  VALUES
  (1, 'Running'),
  (62, 'Speed Skiing'),
  (2, 'Nordic Walking'),
  (63, 'PushUps'),
  (3, 'Cycling'),
  (64, 'SitUps'),
  (4, 'Mountain Biking'),
  (65, 'PullUps'),
  (5, 'Other'),
  (66, 'Squats'),
  (6, 'Inline Skating'),
  (7, 'Hiking'),
  (68, 'Baseball'),
  (8, 'Cross-country skiing'),
  (69, 'Crossfit'),
  (9, 'Skiing'),
  (70, 'Dancing'),
  (10, 'Snowboarding'),
  (71, 'Ice Hockey'),
  (11, 'Motorbike'),
  (72, 'Skateboarding'),
  (13, 'Snowshoeing'),
  (73, 'Zumba'),
  (14, 'Treadmill'),
  (74, 'Gymnastics'),
  (15, 'Ergometer'),
  (75, 'Rugby'),
  (16, 'Elliptical'),
  (76, 'Standup Paddling'),
  (17, 'Rowing'),
  (77, 'Sixpack'),
  (18, 'Swimming'),
  (78, 'Butt Training'),
  (19, 'Walking'),
  (80, 'Leg Training'),
  (20, 'Riding'),
  (81, 'Results Workout'),
  (21, 'Golfing'),
  (82, 'Trail Running'),
  (22, 'Race Cycling'),
  (84, 'Plogging'),
  (23, 'Tennis'),
  (85, 'Wheelchair'),
  (24, 'Badminton'),
  (86, 'E Biking'),
  (25, 'Squash'),
  (87, 'Scootering'),
  (26, 'Yoga'),
  (88, 'Rowing Machine'),
  (27, 'Aerobics'),
  (89, 'Stair Climbing'),
  (28, 'Martial Arts'),
  (90, 'Jumping Rope'),
  (29, 'Sailing'),
  (91, 'Trampoline'),
  (30, 'Windsurfing'),
  (92, 'Bodyweight Training'),
  (31, 'Pilates'),
  (93, 'Tabata'),
  (32, 'Rock Climbing'),
  (94, 'Callisthenics'),
  (33, 'Frisbee'),
  (95, 'Suspension Training'),
  (34, 'Strength Training'),
  (96, 'Powerlifting'),
  (35, 'Volleyball'),
  (97, 'Olympic Weightlifting'),
  (36, 'Handbike'),
  (98, 'Stretching'),
  (37, 'Cross Skating'),
  (99, 'Mediation'),
  (38, 'Soccer'),
  (100, 'Bouldering'),
  (42, 'Surfing'),
  (101, 'Via Ferrata'),
  (43, 'Kitesurfing'),
  (102, 'Pade'),
  (44, 'Kayaking'),
  (103, 'Pole Dancing'),
  (45, 'Basketball'),
  (104, 'Boxing'),
  (46, 'Spinning'),
  (105, 'Cricket')
) AS new_data (id, activity)
WHERE NOT EXISTS (
  SELECT 1
  FROM sport_type
  WHERE id = new_data.id
)
"""
try:
    conn = psycopg2.connect(conn_string)
    cur = conn.cursor()
    # Execute the CREATE TABLE statement
    cur.execute(sport_type_sql)
    # Commit the changes and close the connection
    conn.commit()
    cur.close()
    conn.close()
    print ("Sport Types initialized successfully")
except Exception as e:
    raise ValueError(f"failed to initialize the sport type: {e}")

In [57]:
# @title Authenticate to  Google Drive and download the file.

from pickle import TRUE
# Import PyDrive and associated libraries.
# This only needs to be done once per notebook.
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
import os

# Authenticate and create the PyDrive client.
# This only needs to be done once per notebook.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# Download a file based on its file ID.

downloaded_file = drive.CreateFile({'id': file_id})

os.makedirs(work_dir, exist_ok=TRUE)
file_path = os.path.join(work_dir, "runtastic_data_export.zip")
downloaded_file.GetContentFile(file_path)


In [None]:
# @title Unzip the data
# Extract all files from the downloaded archive
import os
import zipfile

#   Recursively displays the folders beneath a specific folder in hierarchical way.
def display_folders(folder_path, indent=0):
  # Get all subfolders and files in the current folder
  subfolders = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))]
  files = [f for f in os.listdir(folder_path) if not os.path.isdir(os.path.join(folder_path, f))]

  # Print the current folder name with indentation
  print(" " * indent + folder_path)
  # Recursively display subfolders
  for subfolder in subfolders:
    display_folders(os.path.join(folder_path, subfolder), indent + 2)

# Open the zip file and extract it
zipobject = zipfile.ZipFile(file_path)
# Unzip all files in a specific directory
zipobject.extractall(work_dir)

# Display the contents of the extracted zip
display_folders(work_dir)

# List all sport sessions
session_files = []
for filename in os.listdir(os.path.join(work_dir, "Sport-sessions")):
  if filename.endswith(".json"):
    session_files.append(os.path.join(work_dir, "Sport-sessions", filename))

# List all gps-data files
gps_data_files = []
for filename in os.listdir(os.path.join(work_dir, "Sport-sessions", "GPS-data")):
  if filename.endswith(".json"):
    gps_data_files.append(os.path.join(work_dir, "Sport-sessions", "GPS-data", filename))


In [None]:
# @title Load all Sport Sessions
import pandas as pd
import json

def process_features(features, id):
  """
  Function to extract and flatten features data, returning the features as a dictionary
  We will use this function to extract and flatten data in the features field
  Each feature has specific metrics and data points : weather, location,
  A list of fastest segments is provided separately as a list of dictionaries.

  Args:
    features: list of features
    id: id of the current session file
  Return:
    feature_values: dictionary of values of interest extracted from each relevant feature
    fastest_segments: list of segments enriched by the sport session id
  """
  feature_values = {}
  segments = []
  for feature in features:
    if feature["type"] == "fastest_segments":
      fastest_segments = feature["attributes"]["segments"]
      segments = extract_fastest_segments(id, fastest_segments)
      continue
    if feature["type"] not in features_to_load:
      continue
    for key, value in feature["attributes"].items():
      if isinstance(value, dict):
        # We will not support the embedding json object here
        continue
      if key not in supported_feature_attributes:
        # We are not interested in this data dimension for now
        continue
      feature_values[key]=value
  return feature_values, segments

def extract_fastest_segments(id, array_of_segments):
  """
  Retrieve the list of fastest segments beneath the features record
  It has the form


  Args:
    array_of_segments: the json array having the list of segments
    id: id of the sport session
  """
  segments = []
  for segment in array_of_segments:
    segment["id"] = id
    segments.append(segment)
  return segments

# Initialize empty DataFrame to store the sport sessions and the fastest segments
sessions_df = pd.DataFrame()
fastest_segments_df = pd.DataFrame()
# Read each JSON file, process features, and append data to the dataframe
try:
  for filename in session_files:
    with open(filename, "r") as f:
      data = json.load(f)
      # Extract core data
      loaded_data = {}
      for col in top_level_fields:
        loaded_data[col] = data[col]
      # Process features and get a dictionary of feature values
      feature_values, segments = process_features(data["features"], data["id"])
      loaded_data.update(feature_values)
      current_row_df = pd.DataFrame.from_dict(loaded_data, orient='index')
      sessions_df = pd.concat([sessions_df, current_row_df.T], ignore_index=True)

      current_segments_df = pd.DataFrame.from_dict(segments)
      fastest_segments_df = pd.concat([fastest_segments_df, current_segments_df], ignore_index=True)
except Exception as e:
    print(f"Error processing file '{filename}': {e}")
fastest_segments_df

In [None]:
# @title Handle non provided values
# Some features are not present in all sessions like temperature or humidity,
# which lead to NaN fields
nan_columns = sessions_df.columns[sessions_df.isnull().any()]
if len(nan_columns) > 0:
  print('data dimensions with NaN values: ', end=' ')
  print(' '.join(nan_columns))

# Replace them with a predefined sentinel value
sentinel_value = -99999
sessions_df.fillna(sentinel_value, inplace=True)


In [None]:
# @title Convert all time-related columns to appropriate values

import datetime

time_cols = ['start_time', 'end_time', 'created_at', 'updated_at']

# Convert time columns to UTC
sessions_df[time_cols] = sessions_df[time_cols].apply(
    lambda col: pd.to_datetime(col, unit='ms', utc=True), axis=1
)
fastest_segments_df['started_at'] = pd.to_datetime(fastest_segments_df['started_at'], unit='ms', utc=True)
# Convert all duration to seconds instead of milliseconds
duration_cols = ['duration', 'pause', 'start_time_timezone_offset', 'end_time_timezone_offset' ]
for col in duration_cols:
  sessions_df[col] = sessions_df[col] / 1000
fastest_segments_df['duration'] = fastest_segments_df['duration'] / 1000

fastest_segments_df.head()

In [64]:
# @title Compute some statistics from all sport sessions
sessions_df.describe()


Unnamed: 0,version,duration,pause,calories,dehydration_volume,start_time_timezone_offset,end_time_timezone_offset,temperature,wind_speed,wind_direction,humidity,start_latitude,distance,elevation_gain,elevation_loss
count,137.0,137.0,137.0,137.0,137.0,137.0,137.0,137.0,137.0,137.0,137.0,137.0,137.0,137.0,137.0
mean,6.145985,2891.111869,1.141248,897.70073,1124.489051,5045.255474,5045.255474,-1446.489051,-1445.218978,-1241.861314,-1391.189781,43.654765,7789.49635,25.547445,26.182482
std,4.612228,1298.565837,3.135842,277.689955,404.723619,1771.173207,1771.173207,12039.451583,12039.608435,12064.753013,12046.216335,0.380097,2389.745034,17.38025,17.373162
min,3.0,446.688,0.0,137.0,159.0,3600.0,3600.0,-99999.0,-99999.0,-99999.0,-99999.0,43.56426,1189.0,0.0,0.0
25%,4.0,2124.474,0.0,679.0,817.0,3600.0,3600.0,8.0,8.0,130.0,59.0,43.635475,6015.0,15.0,15.0
50%,5.0,2660.042,0.0,891.0,1084.0,3600.0,3600.0,14.0,13.0,260.0,71.0,43.635635,7594.0,26.0,26.0
75%,7.0,3398.19,1.214,1086.0,1358.0,7200.0,7200.0,17.0,19.0,290.0,80.0,43.63575,9422.0,33.0,32.0
max,47.0,13905.235,27.974,1562.0,2163.0,7200.0,7200.0,34.0,55.0,350.0,100.0,48.059555,19002.0,126.0,131.0


In [65]:
sessions_df.dtypes

version                                     int64
duration                                  float64
pause                                     float64
calories                                    int64
dehydration_volume                          int64
start_time_timezone_offset                float64
end_time_timezone_offset                  float64
start_time                    datetime64[ns, UTC]
end_time                      datetime64[ns, UTC]
created_at                    datetime64[ns, UTC]
updated_at                    datetime64[ns, UTC]
id                                         object
sport_type_id                              object
temperature                               float64
wind_speed                                float64
wind_direction                            float64
humidity                                  float64
start_latitude                            float64
start_longitude                            object
distance                                    int64


In [None]:
# @title Load  sport sesssions in the postgresql table
import pandas as pd
import psycopg2

# Replace with your connection details
conn = psycopg2.connect(conn_string)
cur = conn.cursor()

# Define the unique columns for checking existing data
unique_cols = ['id']  # Adjust based on your unique identifier

try:
  # Fetch existing data for comparison
  existing_data_sql = f"SELECT {', '.join(unique_cols)} FROM {sessions_table};"
  cur.execute(existing_data_sql)
  existing_data = pd.DataFrame(cur.fetchall(), columns=unique_cols)

  # Merge dataframes to identify new rows
  new_data = pd.merge(sessions_df, existing_data, how='left', on=unique_cols, indicator=True)
  new_data = new_data[new_data['_merge'] == 'left_only'][sessions_df.columns]

  # Insert only new rows into the table
  if not new_data.empty:
      insert_sql = """
      INSERT INTO sessions (
          version, duration, pause, calories, dehydration_volume,
          start_time_timezone_offset, end_time_timezone_offset, start_time, end_time,
          created_at, updated_at, id, sport_type_id, temperature, wind_speed, wind_direction,
          humidity, start_latitude, start_longitude, distance, average_speed,	average_pace,	max_speed,
          elevation_gain,	elevation_loss)
          VALUES (%s, %s, %s, %s, %s, %s, %s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
      """

      data_tuples = new_data.to_records(index=False)
      cur.executemany(insert_sql, data_tuples)
  # Commit changes and close connection
  conn.commit()
  print("Data inserted successfully!")
except Exception as e:
    raise ValueError(f"failed to load data into the sessions table: {e}")

cur.close()
conn.close()



In [None]:
# @title Display the location of all Sport Sessions on a map
import folium
from folium import plugins
sessions_map = folium.Map(location=[43.6047, 1.4442], zoom_start=5) # My main location, TODO, get a centroid of all points.
sessions_cl = plugins.MarkerCluster().add_to(sessions_map)
# Loop through the dataframe and add the location of each session to the cluster
for lat, lng, label in zip(sessions_df.start_latitude, sessions_df.start_longitude, sessions_df.start_time):
  folium.Marker(
      location=[lat, lng],
      popup=label,
      icon=None
  ).add_to(sessions_cl)


# Display the map
sessions_map

In [None]:
# @title Export the  extracted sport sessions in an excel sheet
from google.colab import files

excel_filename = 'runtastic_data.xlsx'
excel_filepath = os.path.join(work_dir, excel_filename)
sessions_df.to_excel(excel_filepath)
files.download(excel_filepath)
