# REDBACK OPS - SunCycle
*Mark Telley*

Connect to Wahoo, retrieve User Details, Power Zone and FIT File / Handle Fit File and process prior to ingestion into a Date Warehouse / Table

## WAHOO API

In [None]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Load Cred from secure location
creds_file_path = '/content/drive/MyDrive/Colab Notebooks/Assets/Wahoo/cred.txt'

# Get Key Cred details
with open(creds_file_path, 'r') as f:
    lines = f.readlines()
    _, client_id = lines[0].strip().split(" = ", 1)
    _, client_secret = lines[1].strip().split(" = ", 1)

In [None]:
import requests

# Set OAuth2 credentials and other parameters
redirect_uri = 'https://www.WEBSITE.com' 
scopes = 'user_read%20+workouts_read%20+power_zones_read%20+power_zones_write'
base_url = 'https://api.wahooligan.com'

# Step 1: Redirect user to Wahoo login page for authorisation (Sandbox)
auth_url = f'{base_url}/oauth/authorize?client_id={client_id}&redirect_uri=\
{redirect_uri}&scope={scopes}&response_type=code'
print(f'Please go to this URL to authorize the app: {auth_url}')

# User to open link, authorise, and enter code
# auth_code = ""
auth_code = input('Enter the code from the redirect URI: ') 


In [None]:
# Step 2: Exchange authorisation code for access and refresh tokens
token_url = f'{base_url}/oauth/token'
payload = {
    'client_id': client_id,
    'client_secret': client_secret,
    'redirect_uri': redirect_uri,
    'grant_type': 'authorization_code',
    'code': auth_code
}

In [None]:
# Check Expiry Time
import time
response = requests.post(token_url, data=payload)
if response.status_code != 200:
    raise Exception(f'Error getting access token: {response.text}')
access_token = response.json()['access_token']
refresh_token = response.json()['refresh_token']
expires_in = response.json()['expires_in']
expiration_time = time.time() + expires_in
human_readable_time = time.ctime(expiration_time)
print(human_readable_time)

Mon May  8 06:18:42 2023


In [None]:
# CURL - Quick Check
!curl --header 'Authorization: Bearer {access_token}'\
 https://api.wahooligan.com/v1/user

In [None]:
from datetime import datetime
# Step 3: Use access token to get authenticated user details
user_url = f'{base_url}/v1/user'
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(user_url, headers=headers)
if response.status_code != 200:
    raise Exception(f'Error getting user details: {response.text}')
user_details = response.json()

# Print the user details
print(f'User ID:{user_details["id"]}') 
print(f'Name: {user_details["first"]} {user_details["last"]}')
gender = user_details["gender"]
if gender == 0:
    print("Gender: Male")
elif gender == 1:
    print("Gender: Female")
else:
    print("Gender: Prefer not to say")
print(f'Height: {user_details["height"]} meters')
print(f'Weight: {user_details["weight"]} kg')
# Convert to date object
dob = datetime.strptime(user_details["birth"], '%Y-%m-%d').date() 
today = datetime.today().date()  # Get today's date
age = today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day))
print(f'Birthday: {user_details["birth"]}')
print(f"Current age is {age} years.")

---
POWER ZONES

In [None]:
# No Power (Sandbox restriction) - CHECK Again
url = 'https://api.wahooligan.com/v1/power_zones'

response = requests.get(url, headers=headers)

if response.status_code == 200:
    # Print the response body
    print(response.json())
else:
    # Print the status code and error message
    print(f'Error: {response.status_code} - {response.reason}')

[]


Create Power Zones

In [None]:
!curl --header 'Authorization: Bearer {access_token}'  -X POST -d 'power_zone[zone_1]=100&power_zone[zone_2]=120&power_zone[zone_3]=120&power_zone[zone_4]=120&power_zone[zone_5]=120&power_zone[zone_6]=120&power_zone[zone_7]=120&power_zone[ftp]=400&power_zone[zone_count]=8&power_zone[workout_type_family_id]=6&power_zone[critical_power]=324' https://api.wahooligan.com/v1/power_zones

Power Zone ID = 3437134

Get Power Zones

In [None]:
!curl --header 'Authorization: Bearer {access_token}' https://api.wahooligan.com/v1/power_zones

Delete the Power Zone created

In [None]:
!curl -X DELETE --header 'Authorization: Bearer {access_token}' https://api.wahooligan.com/v1/power_zones/3437134

{"success":"Power Zone has been deleted"}

Check if the power Zones are empty:

In [None]:
!curl --header 'Authorization: Bearer {access_token}' https://api.wahooligan.com/v1/power_zones

[]

---

In [None]:
import os

# Set path
folder_path = '/content/drive/MyDrive/Colab Notebooks/Assets/Wahoo'

# Delete existing files
extensions = ('.csv', '.fit')
for filename in os.listdir(folder_path):
    if filename.lower().strip().endswith(extensions):
        file_path = os.path.join(folder_path, filename)
        os.remove(file_path)
        print(f"Deleted file: {file_path}")

In [None]:
# Step 4: Use access token to get the last 4 workouts
# TODO: UPDATE TO JUST THE LAST WORKOUT
workouts_url = f'{base_url}/v1/workouts?per_page=4'
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(workouts_url, headers=headers)
if response.status_code != 200:
    raise Exception(f'Error getting workouts: {response.text}')
workouts = response.json()['workouts']

# Print the last 4 workouts
# TODO: SKIP THIS IN PRODUCTION
for i, workout in enumerate(workouts):
    print(f'{i + 1}: Workout ID {workout["id"]} on {workout["updated_at"]}')

# Step 5: Let the user select a workout
# TODO: SKIP THIS IN PRODUCTION
while True:
    selection = input('Select a workout (1-4): ')
    try:
        selection = int(selection)
        if 1 <= selection <= 4:
            break
        else:
            print('Invalid selection.')
    except ValueError:
        print('Invalid selection.')

In [None]:
workoutID = workout["id"]

In [None]:
!curl --header 'Authorization: Bearer {access_token}'\
 https://api.wahooligan.com/v1/workouts/{workoutID}/workout_summary

In [None]:
import json
import pandas as pd

user_url = f'{base_url}/v1/workouts/{workoutID}/workout_summary'
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(user_url, headers=headers)
if response.status_code != 200:
    raise Exception(f'Error getting user details: {response.text}')
workout_details = response.json()

# Print the workout details
print(f'Workout ID: {workout_details["id"]}')
print(f'Ascent Accumulated: {workout_details["ascent_accum"]}')
print(f'Average Cadence: {workout_details["cadence_avg"]}')
print(f'Calories Accumulated: {workout_details["calories_accum"]}')
print(f'Distance Accumulated: {workout_details["distance_accum"]}')
print(f'Active Duration Accumulated: {workout_details["duration_active_accum"]}')
print(f'Paused Duration Accumulated: {workout_details["duration_paused_accum"]}')
print(f'Total Duration Accumulated: {workout_details["duration_total_accum"]}')
print(f'Average Heart Rate: {workout_details["heart_rate_avg"]}')
print(f'Last Bike Normalized Power: {workout_details["power_bike_np_last"]}')
print(f'Last Bike Training Stress Score: {workout_details["power_bike_tss_last"]}')
print(f'Average Power: {workout_details["power_avg"]}')
print(f'Average Speed: {workout_details["speed_avg"]}')
print(f'Work Accumulated: {workout_details["work_accum"]}')
print(f'File URL: {workout_details["file"]["url"]}')
print(f'Created At: {workout_details["created_at"]}')
print(f'Updated At: {workout_details["updated_at"]}')

# JSON response
json_response = json.dumps(workout_details)

# Load JSON response
data = json.loads(json_response)

# Flatten nested JSON into a flat dictionary
def flatten_json(json_data, prefix=''):
    flattened_data = {}
    for key, value in json_data.items():
        if isinstance(value, dict):
            flattened_data.update(flatten_json(value, prefix + key + '_'))
        else:
            flattened_data[prefix + key] = value
    return flattened_data

flattened_data = flatten_json(data)

# Create DataFrame
df_workout_sum = pd.DataFrame([flattened_data])

# Drop the original "created_at" and "updated_at" columns
df_workout_sum.drop(columns=['created_at', 'updated_at'], inplace=True)

df_workout_sum.T

In [None]:
!curl --header 'Authorization: Bearer {access_token}' https://api.wahooligan.com/v1/workouts/{workoutID}

In [None]:
user_url = f'{base_url}/v1/workouts/{workoutID}'
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(user_url, headers=headers)
if response.status_code != 200:
    raise Exception(f'Error getting user details: {response.text}')
workout_details = response.json()

# Print the workout details
print(f'Workout ID: {workout_details["id"]}')
print(f'Starts: {workout_details["starts"]}')
print(f'Minutes: {workout_details["minutes"]}')
print(f'Name: {workout_details["name"]}')
print(f'Plan ID: {workout_details["plan_id"]}')
print(f'Workout Token: {workout_details["workout_token"]}')
print(f'Workout Type ID: {workout_details["workout_type_id"]}')
print(f'Workout Summary: {workout_details["workout_summary"]}')

# Convert timestamp to AEST timezone
created_at_aest = pd.to_datetime(workout_details["created_at"]).tz_convert('Australia/Melbourne')
updated_at_aest = pd.to_datetime(workout_details["updated_at"]).tz_convert('Australia/Melbourne')

print(f'Created At: {created_at_aest}')
print(f'Updated At: {updated_at_aest}')

# JSON response
json_response = json.dumps(workout_details)

# Load JSON response
data = json.loads(json_response)

# Flatten nested JSON into a flat dictionary
def flatten_json(json_data, prefix=''):
    flattened_data = {}
    for key, value in json_data.items():
        if isinstance(value, dict):
            flattened_data.update(flatten_json(value, prefix + key + '_'))
        else:
            flattened_data[prefix + key] = value
    return flattened_data

flattened_data = flatten_json(data)

# Create DataFrame
df_workout_ = pd.DataFrame([flattened_data])

# Add new date variables to the DataFrame
df_workout_['created_at_aest'] = created_at_aest
df_workout_['updated_at_aest'] = updated_at_aest

# Drop the original "created_at" and "updated_at" columns
df_workout_.drop(columns=['created_at', 'updated_at'], inplace=True)
df_workout_.T


In [None]:
# CURL - Quick Check POWER 215508043
!curl --header 'Authorization: Bearer {access_token}' \
 https://api.wahooligan.com/v1/power_zones/

[]

In [None]:
# Step 6: Use access token to get the workout summary and FIT file URL
workout_id = workouts[selection - 1]['id']
workout_summary_url = f'{base_url}/v1/workouts/{workout_id}/workout_summary'
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(workout_summary_url, headers=headers)
if response.status_code != 200:
    raise Exception(f'Error getting workout summary: {response.text}')
workout_summary = response.json()

In [None]:
# Get FIT file URL from workout summary
fit_file_url = workout_summary['file']['url']

# Download FIT file and save to specified path
response = requests.get(fit_file_url)
if response.status_code != 200:
    raise Exception(f'Error downloading FIT file: {response.text}')

fit_file_path = os.path.join(folder_path, f'{workout_id}.fit')

with open(fit_file_path, 'wb') as f:
    f.write(response.content)

print(f'Downloaded FIT file for workout ID {workout_id} to {fit_file_path}')

Downloaded FIT file for workout ID 215508043 to /content/drive/MyDrive/Colab Notebooks/Assets/Wahoo/215508043.fit


## CONVERSION

In [None]:
# Install
!pip install fitparse

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting fitparse
  Downloading fitparse-1.2.0.tar.gz (65 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.7/65.7 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: fitparse
  Building wheel for fitparse (setup.py) ... [?25l[?25hdone
  Created wheel for fitparse: filename=fitparse-1.2.0-py3-none-any.whl size=68227 sha256=31ef6d0122c4c672b23875cb24c26c44614e0b74e125325a4e8e0000902dc504
  Stored in directory: /root/.cache/pip/wheels/81/67/7b/77a2f8ba348bafbbad6262a80bc51be27b2f9fccbaefc74671
Successfully built fitparse
Installing collected packages: fitparse
Successfully installed fitparse-1.2.0


In [None]:
import fitparse
import pytz
# import re

In [None]:
# Schema
allowed_fields = [
    "timestamp",
    "position_lat",
    "position_long",
    "distance",
    "enhanced_altitude",
    "altitude",
    "ascent",
    "grade",
    "calories",
    "enhanced_speed",
    "speed",
    "heart_rate",
    "temperature",
    "cadence",
    # "fractional_cadence",
    "power",
    "left_right_balance",
    "gps_accuracy",
    "descriptor",
    "product_name",
    "serial_number",
]

# TIME DATE
required_fields = ["timestamp"]
UTC = pytz.UTC
CST = pytz.timezone("Australia/Melbourne")

In [None]:
""" 
This code defines a function called "write_fit_file_to_csv" which takes a 
FitFile object as input and converts its data into a CSV file format. 
The function filters out only the required fields from the FitFile messages,
 and then writes them into a new CSV file. The output file path is 
 "test_output.csv" by default, but can be changed by providing a 
 new path as a parameter.
"""

import csv
def write_fit_file_to_csv(fitfile, output_file="test_output.csv"):
    messages = fitfile.messages
    # raw data in messages in one line
    data = []
    for m in messages:
        skip = False
        if not hasattr(m, "fields"):
            continue
        fields = m.fields
        # check for important data types
        mdata = {}
        for field in fields:
            # print(field) print varaibles
            if field.name in allowed_fields:
                if field.name == "timestamp":
                    mdata[field.name] = \
                    UTC.localize(field.value).astimezone(CST)
                else:
                    mdata[field.name] = field.value
        for rf in required_fields:
            if rf not in mdata:
                skip = True

        if not skip:
            data.append(mdata)
    # write to csv
    with open(output_file, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(allowed_fields)
        print(allowed_fields)
        for entry in data:
            line_file = []
            for k in allowed_fields:
                data_var = str(entry.get(k, ""))
                # print(entry," ", k," " ,data_var)
                line_file.append(data_var)
            print(line_file)
            writer.writerow(line_file)
    print("wrote %s" % output_file)

In [None]:
"""
This code defines a function called "handlefitfile" which processes the 
most recent ".fit" file found in a specified directory by converting it to 
a ".csv" file format using the "fitparse" and "csv" libraries. It first gets 
the list of all files in the specified directory, filters out non-FIT files, 
and then gets the most recent FIT file by checking the last modified time of 
each file. It then converts this FIT file to a CSV file by calling 
the "write_fit_file_to_csv" function, and saves the output CSV file into a
Pandas DataFrame. The function returns the path to the output CSV file and
the corresponding Pandas DataFrame.
"""

def handlefitfile():
  # Get the list of files in the "Exports" directory
  path = folder_path
  files = os.listdir(path)

  # Filter out non-FIT files
  fit_files = [file.strip().lower() for file in files if \
               file.strip().lower().endswith(".fit")]

  # Check if there are any FIT files in the directory
  if not fit_files:
      print("No FIT files found in directory.")
      return

  # Get the most recent FIT file in the directory
  file = max(fit_files, key=lambda x:os.path.getmtime(os.path.join(path, x)))

  # Get the path of the most recent FIT file
  file_path = os.path.join(path, file)

  # Process the file
  new_filename = file_path[:-4] + ".csv"

  # Read data from the CSV file into a Pandas DataFrame
   
  fitfile = fitparse.FitFile(file_path, 
                             data_processor=
                             fitparse.StandardUnitsDataProcessor())
  print(f'Converting {file_path}')
  write_fit_file_to_csv(fitfile, new_filename)
  print(new_filename)

  print("Finished conversion")
  
  # Save to pandas.DF
  df = pd.read_csv(new_filename)

  return new_filename, df

In [None]:
# RUN
try:
    processed_file, df = handlefitfile()
except Exception as e:
    print(f'An error occurred: {str(e)}') 

## PREPROCESING

In [None]:
import os

# Convert timestamp to AEST timezone
df['timestamp_AEST'] = pd.to_datetime(df['timestamp'], utc=True) \
                      .dt.tz_convert('Australia/Melbourne')
df['date_AEST'] = df['timestamp_AEST'].dt.date

# Filter out irrelevant columns
df = df[['timestamp', 'timestamp_AEST', 'date_AEST', 'position_lat', 
         'position_long', 'distance', 'enhanced_altitude', 'altitude',
         'ascent', 'grade', 'calories', 'enhanced_speed', 'heart_rate',
         'temperature', 'cadence', 'power', 'left_right_balance',
         'gps_accuracy']]

# Rename column
df = df.rename(columns={'altitudeS': 'altitude'})

# Set hardcoded values for userID, age, gender, weight, FTP
df['userID'] = user_details["id"]
df['workout_ID'] = workoutID
df['age'] = age
gender_str = ""
if gender == 0:
    gender_str = "Male"
elif gender == 1:
    gender_str = "Female"
else:
    gender_str = "Prefer not to say"
df['gender'] = gender_str
df['weight'] = user_details["weight"]
FTP = 302 # manually set
df['FTP'] = FTP 

# Filter out rows with null distance and header row
df = df[df['distance'].notnull() & (df['timestamp'] != 'timestamp')]

# Sort by timestamp
df.sort_values(by='timestamp', inplace=True)

# Use df as the source table
result_df = df.copy()  

# Define the power zone rules as a percentage of FTP
power_zones = [
  {'zone': 'Zone 1', 'ftp_percent': 0.55, 
   'min_percent': 0, 'max_percent': 0.55},
  {'zone': 'Zone 2', 'ftp_percent': 0.75,
   'min_percent': 0.55, 'max_percent': 0.75},
  {'zone': 'Zone 3', 'ftp_percent': 0.90, 
   'min_percent': 0.75, 'max_percent': 0.90},
  {'zone': 'Zone 4', 'ftp_percent': 1.05, 
   'min_percent': 0.90, 'max_percent': 1.05},
  {'zone': 'Zone 5', 'ftp_percent': 1.20,
   'min_percent': 1.05, 'max_percent': 1.20},
  {'zone': 'Zone 6', 'ftp_percent': 1.50, 
   'min_percent': 1.20, 'max_percent': 1.50},
  {'zone': 'Zone 7', 'ftp_percent': '>1.50',
   'min_percent': 1.50, 'max_percent': None},
]

# Define a function to apply the rules to each value in the dataframe
def map_power_zone(value, ftp):
    for zone in power_zones:
        if zone['ftp_percent'] == '>1.50':
            if value >= ftp * 1.5:
                return zone['zone']
        else:
            if value >= ftp * zone['min_percent'] and \
             (zone['max_percent'] is None or value < ftp * zone['max_percent']):
                return zone['zone']
    return None

# Apply the function to each value in the 'power' column
result_df['power_zone'] = \
 result_df.apply(lambda row: map_power_zone(row['power'], row['FTP']), axis=1)


# Process the file
processed_file = processed_file[:-4] + "_processed.csv"

# Save to CSV
result_df.to_csv(processed_file, index=False)

result_df.head(18)


In [None]:
result_df.columns

Index(['timestamp', 'timestamp_AEST', 'date_AEST', 'position_lat',
       'position_long', 'distance', 'enhanced_altitude', 'altitude', 'ascent',
       'grade', 'calories', 'enhanced_speed', 'heart_rate', 'temperature',
       'cadence', 'power', 'left_right_balance', 'gps_accuracy', 'userID',
       'workout_ID', 'age', 'gender', 'weight', 'FTP', 'power_zone'],
      dtype='object')

In [None]:
result_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 8233 entries, 2 to 8938
Data columns (total 25 columns):
 #   Column              Non-Null Count  Dtype                              
---  ------              --------------  -----                              
 0   timestamp           8233 non-null   object                             
 1   timestamp_AEST      8233 non-null   datetime64[ns, Australia/Melbourne]
 2   date_AEST           8233 non-null   object                             
 3   position_lat        8233 non-null   float64                            
 4   position_long       8233 non-null   float64                            
 5   distance            8233 non-null   float64                            
 6   enhanced_altitude   8233 non-null   float64                            
 7   altitude            8233 non-null   float64                            
 8   ascent              8232 non-null   float64                            
 9   grade               8232 non-null   float

## PUSH TO DATA WAREHOUSE

In [None]:
"""
BigQuery or Similar sql create a table execution here
"""

In [None]:
result_df.to_csv(processed_file, index=False)

In [None]:
processed_file

In [None]:
result_df

In [None]:
df_workout_

In [None]:
df_workout_sum