In [1]:
from google.cloud import storage
from google.cloud import bigquery
import pandas as pd
import os
import datetime
from training_data_angles import analyze_gait_data

## Notebook aim

The aim of this notebook is to load from bigquery the .json files containing the coordinates for runners, then obtain the angles using the function in training_data_angles.py, to output each one into a new folder in bigquery.

### Creating list of filenames to loop through

In [2]:
jsons = pd.read_csv("data/meta/run_data_meta.csv")

json_filenames = []
for i in range(len(jsons)):
    json_filenames.append(f"{jsons.loc[i, 'sub_id']}/{jsons.loc[i,'filename']}")

### Google Cloud Set-up

In [3]:
# Input bucket name
GCS_INPUT_BUCKET_NAME = 'stridecare'
GCS_INPUT_PREFIX = 'RunInjuryClinic/' 

# Initialize GCS client
gcs_client = storage.Client()

#Output to bigquery
BQ_PROJECT_ID = 'stridecare-461809'
BQ_DATASET_ID = 'angle_csvs'

#output to local data folder
LOCAL_OUTPUT_BASE_DIR = './processed_angles_data'

# Temporary directory to download JSON files to
LOCAL_TEMP_JSON_DIR = f"temp_gait_json_files_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"
os.makedirs(LOCAL_TEMP_JSON_DIR, exist_ok=True) # Create the directory if it doesn't exist

# Initialize GCS and BigQuery clients
gcs_client = storage.Client()
bq_client = bigquery.Client(project=BQ_PROJECT_ID)

print(f"Starting gait analysis pipeline for GCS input bucket: {GCS_INPUT_BUCKET_NAME}/{GCS_INPUT_PREFIX}")
print(f"Processed angles will be uploaded to BigQuery dataset: {BQ_PROJECT_ID}.{BQ_DATASET_ID}")
print(f"Processed CSVs will also be saved locally to: {LOCAL_OUTPUT_BASE_DIR}")


Starting gait analysis pipeline for GCS input bucket: stridecare/RunInjuryClinic/
Processed angles will be uploaded to BigQuery dataset: stridecare-461809.angle_csvs
Processed CSVs will also be saved locally to: ./processed_angles_data


### Safety check

In [None]:
try:
    input_bucket = gcs_client.bucket(GCS_INPUT_BUCKET_NAME)

    # Ensure the BigQuery dataset exists
    try:
        bq_client.get_dataset(BQ_DATASET_ID)
        print(f"BigQuery Dataset '{BQ_DATASET_ID}' already exists.")
    except Exception: # Catching a general exception if dataset not found, BigQuery raises NotFound
        print(f"BigQuery Dataset '{BQ_DATASET_ID}' not found. Creating it...")
        dataset = bigquery.Dataset(f"{BQ_PROJECT_ID}.{BQ_DATASET_ID}")
        dataset.location = "EU" # Or your desired location (e.g., "US", "asia-east1")
        bq_client.create_dataset(dataset, timeout=30)
        print(f"BigQuery Dataset '{BQ_DATASET_ID}' created successfully.")


#json filenames to process 

    json_files_to_process = []
    for json_file in json_filenames:
        parts = json_file.split('/')
        if len(parts) >= 2:
            json_files_to_process.append({"sub_id": parts[-2], "json_file": parts[-1]})
        else:
            print(f"Warning: Skipping invalid path format in list: {json_file}")

    if not json_files_to_process:
        print("No JSON files specified in 'json_files_to_process'. Please ensure your list is populated.")
    
    processed_files_count = 0
    
    for entry in json_files_to_process:
        sub_id = entry.get("sub_id")
        json_filename = entry.get("json_file")

        if not sub_id or not json_filename:
            print(f"Skipping invalid entry in list: {entry}")
            continue

        # Construct the full storage path for the input JSON file in GCS
        gcs_input_blob_path = f"{GCS_INPUT_PREFIX}{sub_id}/{json_filename}"
        
        print(f"\nProcessing GCS blob: {gcs_input_blob_path}")
        local_json_filepath = os.path.join(LOCAL_TEMP_JSON_DIR, json_filename)
        
        try:
            # Download the JSON file locally
            input_blob = input_bucket.blob(gcs_input_blob_path)
            input_blob.download_to_filename(local_json_filepath)
            print(f"Downloaded '{gcs_input_blob_path}' to '{local_json_filepath}'")

            # Process the file using the imported analyze_gait_data function
            angles_df = analyze_gait_data(local_json_filepath)

            if not angles_df.empty:
                base_json_name = os.path.splitext(json_filename)[0] # e.g., "20110531T161051"
                
                # --- 1. Save to local data folder (as CSV) ---
                local_output_sub_dir = os.path.join(LOCAL_OUTPUT_BASE_DIR, sub_id)
                os.makedirs(local_output_sub_dir, exist_ok=True) # Create sub_id folder locally
                local_csv_filename = f"{base_json_name}_angles.csv"
                local_csv_filepath = os.path.join(local_output_sub_dir, local_csv_filename)
                
                angles_df.to_csv(local_csv_filepath, index=False)
                print(f"Saved angles locally to: '{local_csv_filepath}'")

                # --- 2. Upload to BigQuery (as a new table) ---
                # BigQuery table names must start with a letter or underscore and contain
                # only letters, numbers, and underscores. Replace any invalid chars.
                bq_table_name = f"angles_{base_json_name.replace('.', '_').replace('-', '_')}"
                bq_table_id = f"{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{bq_table_name}"
                
                # Configure BigQuery load job (e.g., overwrite existing table if it has the same name)
                job_config = bigquery.LoadJobConfig(
                    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # Overwrites if table exists
                    # Consider WRITE_APPEND if you want to add rows to an existing table with the same schema
                    # or WRITE_EMPTY if you want the job to fail if the table already exists.
                )

                load_job = bq_client.load_table_from_dataframe(
                    angles_df, bq_table_id, job_config=job_config
                )
                load_job.result() # Wait for the job to complete
                
                print(f"Uploaded angles to BigQuery table: '{bq_table_id}'")
                processed_files_count += 1
            else:
                print(f"  '{json_filename}': No angle data returned by analyze_gait_data. Skipping all outputs.")

        except Exception as e:
            print(f"Error processing '{gcs_input_blob_path}': {e}")
        finally:
            # Clean up: remove the downloaded JSON file
            if os.path.exists(local_json_filepath):
                os.remove(local_json_filepath)
                print(f"Cleaned up local temporary JSON file: '{local_json_filepath}'")

except Exception as e:
    print(f"An unhandled error occurred: {e}")
finally:
    # Clean up the temporary local JSON download directory
    if os.path.exists(LOCAL_TEMP_JSON_DIR):
        try:
            os.rmdir(LOCAL_TEMP_JSON_DIR) # Will only remove if empty
            print(f"Cleaned up temporary JSON download directory: '{LOCAL_TEMP_JSON_DIR}'")
        except OSError:
            print(f"Warning: Could not remove non-empty temporary JSON directory: '{LOCAL_TEMP_JSON_DIR}'. Please remove manually if empty.")

print(f"\nProcessing complete. Total files processed and outputted: {processed_files_count}")

BigQuery Dataset 'angle_csvs' already exists.

Processing GCS blob: RunInjuryClinic/100433/20101005T132240.json
Downloaded 'RunInjuryClinic/100433/20101005T132240.json' to 'temp_gait_json_files_20250605210341/20101005T132240.json'
Saved angles locally to: './processed_angles_data/100433/20101005T132240_angles.csv'
Uploaded angles to BigQuery table: 'stridecare-461809.angle_csvs.angles_20101005T132240'
Cleaned up local temporary JSON file: 'temp_gait_json_files_20250605210341/20101005T132240.json'

Processing GCS blob: RunInjuryClinic/100434/20101117T132240.json
Downloaded 'RunInjuryClinic/100434/20101117T132240.json' to 'temp_gait_json_files_20250605210341/20101117T132240.json'
Saved angles locally to: './processed_angles_data/100434/20101117T132240_angles.csv'
Uploaded angles to BigQuery table: 'stridecare-461809.angle_csvs.angles_20101117T132240'
Cleaned up local temporary JSON file: 'temp_gait_json_files_20250605210341/20101117T132240.json'

Processing GCS blob: RunInjuryClinic/1005