In [None]:
# @title 0. Ingest & Sensor Fusion (Zip Support)
# CELL 1 [TAG: parameters]
# Default parameters (Airflow will inject specific zip paths here)
INPUT_ZIP_PATH = "s3://raw-gps/trip01.zip"
OUTPUT_PROCESSED_PATH = "s3://processed-data/trip01_1hz.csv"
GOOGLE_ROADS_API_KEY = "YOUR_API_KEY_HERE"


In [None]:
# CELL 2: Imports
import os
import time
import json
import requests
import zipfile
import numpy as np
import pandas as pd
import s3fs


In [None]:
# CELL 3: MinIO Configuration & Helper Functions
MINIO_ENDPOINT = os.environ.get("MINIO_ENDPOINT", "http://minio:9000")
MINIO_KEY = os.environ.get("MINIO_ACCESS_KEY", "admin")
MINIO_SECRET = os.environ.get("MINIO_SECRET_KEY", "password123")

# Initialize S3 Filesystem
fs = s3fs.S3FileSystem(
    key=MINIO_KEY,
    secret=MINIO_SECRET,
    client_kwargs={'endpoint_url': MINIO_ENDPOINT}
)

# Pandas storage options for saving later
storage_options = {
    "key": MINIO_KEY,
    "secret": MINIO_SECRET,
    "client_kwargs": {"endpoint_url": MINIO_ENDPOINT}
}

def snap_batch(latitudes, longitudes, api_key):
    """Calls Google Roads API for up to 100 points."""
    url = "https://roads.googleapis.com/v1/snapToRoads"
    path_str = "|".join(f"{lat},{lng}" for lat, lng in zip(latitudes, longitudes))
    params = {"path": path_str, "key": api_key, "interpolate": "false"}
    for attempt in range(3):
        resp = requests.get(url, params=params, timeout=10)
        if resp.status_code == 200:
            return resp.json().get("snappedPoints", [])
        time.sleep(1)
    return []

def assign_segment_ids(df, api_key):
    if api_key == "YOUR_API_KEY_HERE":
        df['segment_id'] = "dummy_" + (df.index // 60).astype(str)
        return df

    df_out = df.copy()
    segment_ids = np.array([None] * len(df), dtype=object)

    for start in range(0, len(df), 100):
        end = min(start + 100, len(df))
        batch = df.iloc[start:end]
        snapped = snap_batch(batch['latitude'].tolist(), batch['longitude'].tolist(), api_key)
        for pt in snapped:
            if "originalIndex" in pt:
                segment_ids[start + pt["originalIndex"]] = pt["placeId"]

    df_out['segment_id'] = segment_ids
    df_out['segment_id'] = df_out['segment_id'].ffill().bfill()
    return df_out


In [None]:
# CELL 4: Load Raw Data from Zip (S3)
print(f"Extracting data from {INPUT_ZIP_PATH}...")
try:
    with fs.open(INPUT_ZIP_PATH, 'rb') as f:
        with zipfile.ZipFile(f) as z:
            with z.open('Accelerometer.csv') as acc_file:
                acc = pd.read_csv(acc_file)
            with z.open('Orientation.csv') as ori_file:
                ori = pd.read_csv(ori_file)
            with z.open('Location.csv') as loc_file:
                loc = pd.read_csv(loc_file)
    print("✅ CSVs loaded successfully from Zip.")
except Exception as exc:  # pylint: disable=broad-except
    print(f"⚠️ Error reading zip ({exc}). Using synthetic data...")
    t = np.linspace(0, 600, 60000)
    acc = pd.DataFrame({'time': t, 'seconds_elapsed': t, 'x': 0, 'y': 0, 'z': 9.8})
    ori = pd.DataFrame({'time': t, 'seconds_elapsed': t, 'qx': 0, 'qy': 0, 'qz': 0, 'qw': 1})
    loc = pd.DataFrame({
        'time': np.linspace(0, 600, 600),
        'seconds_elapsed': np.linspace(0, 600, 600),
        'latitude': 0,
        'longitude': 0,
        'speed': 10,
        'bearing': 0,
        'altitude': 0
    })


In [None]:
# CELL 5: Sensor Fusion (Standard Logic)
t_master = acc['seconds_elapsed'].values
t_dt = pd.to_datetime(acc['time'], unit='ns')

# Interpolate Orientation
qx = np.interp(t_master, ori['seconds_elapsed'], ori.get('qx', ori.get('x')))
qy = np.interp(t_master, ori['seconds_elapsed'], ori.get('qy', ori.get('y')))
qz = np.interp(t_master, ori['seconds_elapsed'], ori.get('qz', ori.get('z')))
qw = np.interp(t_master, ori['seconds_elapsed'], ori.get('qw', ori.get('w')))
q_norm = np.sqrt(qx**2 + qy**2 + qz**2 + qw**2)
qx, qy, qz, qw = qx / q_norm, qy / q_norm, qz / q_norm, qw / q_norm

# Rotate Acceleration
a_dev = acc[['x', 'y', 'z']].values
q_vec = np.stack([qx, qy, qz], axis=1)
w = qw
t_vec = 2 * np.cross(q_vec, a_dev)
a_world = a_dev + (w[:, None] * t_vec) + np.cross(q_vec, t_vec)

# Interpolate GPS & Calculate Forward Acc
gps_interp = {}
for col in ['speed', 'bearing', 'latitude', 'longitude']:
    gps_interp[col] = np.interp(t_master, loc['seconds_elapsed'], loc[col])

bearing_rad = np.deg2rad(gps_interp['bearing'])
acc_forward = a_world[:, 0] * np.sin(bearing_rad) + a_world[:, 1] * np.cos(bearing_rad)


In [None]:
# CELL 6: Resample & Process
df_hr = pd.DataFrame({
    't': t_dt,
    'seconds_elapsed': t_master,
    'speed_kmh': gps_interp['speed'] * 3.6,
    'acc_forward': acc_forward,
    'latitude': gps_interp['latitude'],
    'longitude': gps_interp['longitude']
})

df_1Hz = df_hr.set_index('t').resample('1S').median().dropna().reset_index()
print("Running Snap-to-Roads...")
# df_final = assign_segment_ids(df_1Hz, GOOGLE_ROADS_API_KEY)


In [None]:
# CELL 7: Save to S3
print(f"Saving to {OUTPUT_PROCESSED_PATH}...")
df_final.to_csv(OUTPUT_PROCESSED_PATH, index=False, storage_options=storage_options)
print("✅ Done.")
