In [34]:
import sys, os
project_root = os.path.abspath(os.path.join(os.getcwd(), "../../"))
if project_root not in sys.path:
    sys.path.insert(0, project_root)
from dataspaces_utils.common import get_s3_client, bucket, pd
import xml.etree.ElementTree as ET
import pandas as pd
from fitparse import FitFile
import os
from io import BytesIO  

In [35]:
s3 = get_s3_client()

In [36]:
def parse_fit_to_df(file_bytes):
    fitfile = FitFile(BytesIO(file_bytes))
    records = []
    for record in fitfile.get_messages("record"):
        row = {}
        for field in record:
            row[field.name] = field.value
        records.append(row)
    return pd.DataFrame(records)

In [37]:
def parse_tcx_to_df(file_bytes):
    tree = ET.ElementTree(ET.fromstring(file_bytes.decode("utf-8")))
    ns = {'tcx': 'http://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2'}
    records = []
    for tp in tree.findall('.//tcx:Trackpoint', ns):
        record = {}
        # 1. Timestamp
        time_elem = tp.find('tcx:Time', ns)
        if time_elem is not None:
            record['timestamp'] = pd.to_datetime(time_elem.text)
        # 2. Speed (basic)
        speed_elem = tp.find('tcx:Speed', ns)
        if speed_elem is not None:
            record['speed'] = float(speed_elem.text)
        # 3. Cadence
        cadence_elem = tp.find('tcx:Cadence', ns)
        if cadence_elem is not None:
            record['cadence'] = float(cadence_elem.text)
        # 4. Heart rate
        hr_elem = tp.find('.//tcx:HeartRateBpm/tcx:Value', ns)
        if hr_elem is not None:
            record['heart_rate'] = int(hr_elem.text)
        # 5. Power (and speed) inside Extensions > TPX
        extensions_elem = tp.find('tcx:Extensions', ns)
        if extensions_elem is not None:
            tpx_elem = extensions_elem.find('.//', ns)
            if tpx_elem is not None:
                for elem in tpx_elem:
                    tag = elem.tag.split('}')[-1].lower()
                    if tag == 'watts':
                        record['power'] = float(elem.text)
                    elif tag == 'speed':
                        record['speed'] = float(elem.text)  # Override if found here
        # Append record if it has a timestamp (required)
        if 'timestamp' in record:
            records.append(record)
    return pd.DataFrame(records)


In [16]:
# 📥 Select one .fit or .tcx file from bronze/original
response = s3.list_objects_v2(Bucket=bucket, Prefix="bronze/original/")
keys = [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith((".fit", ".tcx"))]

# Show available files
print("Available files:")
for key in keys:
    print("🔹", key)

# 🔧 Choose a file to test
test_key = keys[0]  # Change index if you want another one
print(f"\n✅ Testing file: {test_key}")

# Download file from S3
obj = s3.get_object(Bucket=bucket, Key=test_key)
file_bytes = obj["Body"].read()
extension = os.path.splitext(test_key)[-1]

# Run the parser
try:
    if extension == ".fit":
        df = parse_fit_to_df(file_bytes)
        display(df)
except Exception as e:
    print(f"❌ Error during parsing: {e}")

Available files:
🔹 bronze/original/bronze_activity_2025-04-14_bf81db1b.fit
🔹 bronze/original/bronze_activity_2025-04-14_f5a06ce9.tcx
🔹 bronze/original/bronze_activity_2025-04-26_2ff07ffe.fit

✅ Testing file: bronze/original/bronze_activity_2025-04-14_bf81db1b.fit


Unnamed: 0,altitude,distance,enhanced_altitude,heart_rate,temperature,timestamp,enhanced_speed,speed,cadence,fractional_cadence,unknown_87,accumulated_power,power
0,74.2,0.00,74.2,95,21,2025-04-14 17:52:23,,,,,,,
1,74.2,0.00,74.2,95,21,2025-04-14 17:52:24,,,,,,,
2,74.2,0.00,74.2,96,21,2025-04-14 17:52:25,,,,,,,
3,74.2,0.00,74.2,98,21,2025-04-14 17:52:26,,,,,,,
4,74.2,0.00,74.2,99,21,2025-04-14 17:52:27,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
4120,74.2,5990.58,74.2,99,21,2025-04-14 19:01:03,0.0,0.0,0.0,0.0,0.0,10041.0,0.0
4121,74.2,5990.58,74.2,99,21,2025-04-14 19:01:04,0.0,0.0,0.0,0.0,0.0,10041.0,0.0
4122,74.2,5990.58,74.2,98,21,2025-04-14 19:01:05,0.0,0.0,0.0,0.0,0.0,10041.0,0.0
4123,74.2,5990.58,74.2,97,21,2025-04-14 19:01:06,0.0,0.0,0.0,0.0,0.0,10041.0,0.0


In [17]:
# 📥 Select one .fit or .tcx file from bronze/original
response = s3.list_objects_v2(Bucket=bucket, Prefix="bronze/original/")
keys = [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith((".fit", ".tcx"))]

# Show available files
print("Available files:")
for key in keys:
    print("🔹", key)

# 🔧 Choose a file to test
test_key = keys[1]  # Change index if you want another one
print(f"\n✅ Testing file: {test_key}")

# Download file from S3
obj = s3.get_object(Bucket=bucket, Key=test_key)
file_bytes = obj["Body"].read()
extension = os.path.splitext(test_key)[-1]

# Run the parser
try:
    if extension == ".tcx":
        df = parse_tcx_to_df(file_bytes)
        display(df)
except Exception as e:
    print(f"❌ Error during parsing: {e}")

Available files:
🔹 bronze/original/bronze_activity_2025-04-14_bf81db1b.fit
🔹 bronze/original/bronze_activity_2025-04-14_f5a06ce9.tcx
🔹 bronze/original/bronze_activity_2025-04-26_2ff07ffe.fit

✅ Testing file: bronze/original/bronze_activity_2025-04-14_f5a06ce9.tcx


Unnamed: 0,timestamp,cadence,heart_rate,speed,power
0,2025-04-14 17:52:02.618000+00:00,0.0,0,0.000000,0.0
1,2025-04-14 17:52:03.618000+00:00,0.0,0,6.886672,106.0
2,2025-04-14 17:52:04.618000+00:00,0.0,0,6.859050,106.0
3,2025-04-14 17:52:05.618000+00:00,0.0,0,6.893552,106.0
4,2025-04-14 17:52:06.618000+00:00,0.0,0,6.893552,107.0
...,...,...,...,...,...
3837,2025-04-14 18:55:59.618000+00:00,0.0,0,2.549858,49.0
3838,2025-04-14 18:56:00.618000+00:00,0.0,0,1.953638,36.0
3839,2025-04-14 18:56:01.618000+00:00,0.0,0,1.420653,24.0
3840,2025-04-14 18:56:02.618000+00:00,0.0,0,0.966114,16.0


In [48]:
# List .fit and .tcx files in bronze/original/
response = s3.list_objects_v2(Bucket=bucket, Prefix="bronze/original/")
keys = [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith((".fit", ".tcx"))]

for key in keys:
    print(f"Processing: {key}")
    extension = os.path.splitext(key)[-1]

    obj = s3.get_object(Bucket=bucket, Key=key)
    file_bytes = obj["Body"].read()

    # Parse
    try:
        if extension == ".fit":
            df = parse_fit_to_df(file_bytes)
        elif extension == ".tcx":
            df = parse_tcx_to_df(file_bytes)
        else:
            print(f"Unknown extension: {extension}")
            continue
    except Exception as e:
        print(f"Failed to parse {key}: {e}")
        continue

    if df.empty:
        print(f"Empty DataFrame for {key}, skipping.")
        continue

    # Sanitize unsupported types (e.g., objects, time) before saving
    for col in df.columns:
        if df[col].dtype == "object":
            df[col] = df[col].astype(str)

    # Define S3 key
    bronze_name = os.path.basename(key).replace(extension, ".parquet")
    s3_key = f"bronze/parquet/{bronze_name}"

    # Convert DataFrame to Parquet in memory
    try:
        # Create Parquet in memory
        buffer = BytesIO()
        df.to_parquet(buffer, index=False, engine="pyarrow")
        buffer.seek(0)  

        # # Upload safely
        # s3.put_object(
        #     Bucket=bucket,
        #     Key=s3_key,
        #     Body=buffer.read(),  
        #     ContentType="application/octet-stream"
        # )
        # Upload using upload_fileobj — safest for binary streams
        s3.upload_fileobj(
            Fileobj=buffer,
            Bucket=bucket,
            Key=s3_key
        )
        print(f"Saved directly to Supabase: {s3_key}")

    except Exception as e:
        print(f"Failed to save/upload parquet for {key}: {e}")

Processing: bronze/original/bronze_activity_2025-04-14_bf81db1b.fit
Saved directly to Supabase: bronze/parquet/bronze_activity_2025-04-14_bf81db1b.parquet
Processing: bronze/original/bronze_activity_2025-04-14_f5a06ce9.tcx
Saved directly to Supabase: bronze/parquet/bronze_activity_2025-04-14_f5a06ce9.parquet
Processing: bronze/original/bronze_activity_2025-04-26_2ff07ffe.fit
Saved directly to Supabase: bronze/parquet/bronze_activity_2025-04-26_2ff07ffe.parquet


In [51]:
# Verify that the file exists and is a valid Parquet file
try:
	obj = s3.get_object(Bucket=bucket, Key=s3_key)
	buffer = BytesIO(obj['Body'].read())
	
	# Attempt to read the file as a Parquet file
	df_check = pd.read_parquet(buffer, engine="pyarrow")
	display(df_check.head())
except Exception as e:
	print(f"❌ Error reading Parquet file from {s3_key}: {e}")

❌ Error reading Parquet file from bronze/parquet/bronze_activity_2025-04-26_2ff07ffe.parquet: Could not open Parquet input source '<Buffer>': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.


In [40]:
# Setup
s3 = get_s3_client()

# List all parquet files in bronze/parquet/
response = s3.list_objects_v2(Bucket=bucket, Prefix="bronze/parquet/")
keys = [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith(".parquet")]

print(f"📦 Found {len(keys)} Parquet files in bronze/parquet/")

all_dfs = []

# Loop through and read each parquet file
for key in keys:
    try:
        print(f"⬇️ Reading: {key}")
        obj = s3.get_object(Bucket=bucket, Key=key)
        buffer = BytesIO(obj["Body"].read())

        df = pd.read_parquet(buffer, engine="pyarrow")
        df["source_file"] = key  # Optional: track file of origin
        all_dfs.append(df)

    except Exception as e:
        print(f"❌ Failed to read {key}: {e}")

# Combine all into a single DataFrame
if all_dfs:
    combined_df = pd.concat(all_dfs, ignore_index=True)
    print(f"\n✅ Loaded {len(combined_df)} total rows from {len(all_dfs)} files")
    display(combined_df.head())
else:
    print("⚠️ No valid parquet files were loaded.")

📦 Found 3 Parquet files in bronze/parquet/
⬇️ Reading: bronze/parquet/bronze_activity_2025-04-14_bf81db1b.parquet
❌ Failed to read bronze/parquet/bronze_activity_2025-04-14_bf81db1b.parquet: Could not open Parquet input source '<Buffer>': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
⬇️ Reading: bronze/parquet/bronze_activity_2025-04-14_f5a06ce9.parquet
❌ Failed to read bronze/parquet/bronze_activity_2025-04-14_f5a06ce9.parquet: Could not open Parquet input source '<Buffer>': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
⬇️ Reading: bronze/parquet/bronze_activity_2025-04-26_2ff07ffe.parquet
❌ Failed to read bronze/parquet/bronze_activity_2025-04-26_2ff07ffe.parquet: Could not open Parquet input source '<Buffer>': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
⚠️ No valid parquet files were loaded.
