# Least-Squares Estimate of Cycling Parameters

In [None]:
import sys; sys.path.append('..')
import numpy as np
import fitparse
import pandas as pd
import math
import plotly.express as px

from utils.paths import data_folder

In [None]:
def parse_fitfile(
  filepath: str,
  columns: list[str],
  verbose: bool = False,
) -> pd.DataFrame:
  """Parse a .fit file into a format the simulator can use.

  Notes
  -----
  The file must include at least timestamp, distance, and altitude.
  """
  ff = fitparse.FitFile(filepath)
  df = {col: [] for col in columns}

  generator = ff.get_messages("record")

  while True:
    try:
      record = next(generator)

      # Get the record into nicer key-value format.
      record_data = {}
      for data in record:
        record_data[data.name] = data.value
      if verbose:
        print(record_data.keys())

      if any([col not in record_data for col in df]):
        ts = record_data["timestamp"]
        if verbose:
          print(f"Skipping incomplete record @ {ts}")
        continue

      for col in df:
        df[col].append(record_data[col])

    except StopIteration:
      break

    except Exception as e:
      print("Error while iterating over records:")
      print(e)

  return pd.DataFrame(df)


def calculate_grade(df):
  """Preprocess grade."""
  df["dy"] = df.altitude.diff(1)
  df["dx"] = df.distance.diff(1)
  df["grade"] = (df.dy / df.dx).fillna(0)
  df["theta"] = df.grade.map(math.atan).fillna(0)
  return df


def calculate_speed_diff(df):
  df["speed_diff"] = df.speed.diff(1).fillna(0)
  return df


def calculate_dt(df):
  df["dt"] = df.timestamp.diff(1).fillna(0)
  df.dt = df.dt.map(lambda x: x.total_seconds() if type(x) == pd.Timedelta else x)
  return df

In [None]:
def estimate_parameters(df: pd.DataFrame):
  G = 9.81 # m/s2
  m = 72 + 10 # kg
  rho = 1.20 # kg/m3

  dv = df.speed_diff.to_numpy()
  dt = df.dt.to_numpy()
  vt = df.speed.to_numpy()
  P_legs = df.power.to_numpy()
  theta = df.theta.to_numpy()

  b = dv + dt * G * np.sin(theta)
  c1 = dt * P_legs / (m * vt)
  c2 = -dt * G * np.cos(theta)
  c3 = -dt * rho * vt**2 / (2 * m)
  A = np.column_stack((c1, c2, c3))

  x, residuals, rank, s = np.linalg.lstsq(A, b)

  yhat = A @ x

  return x, residuals, rank, s, yhat, b


def estimate_parameters_twostep(df: pd.DataFrame, verbose: bool = False):
  _, _, _, _, yhat, b = estimate_parameters(df)
  df["error"] = b - yhat
  df["abs_error"] = np.abs(df.error)
  before = len(df)

  df = df[df.abs_error < np.percentile(df.abs_error, 95)]

  if verbose:
    print(f"Removed {before - len(df)} outliers")

  x, residuals, rank, s, yhat, b = estimate_parameters(df)

  return x, residuals, rank, s, yhat, b, df

In [None]:
cols = [
  'timestamp',
  'distance',
  'altitude',
  'position_lat',
  'position_long',
  'power',
  'speed',
]

filepath = data_folder("fit/Great_Brook_Farm.fit")
# filepath = data_folder("fit/IM_Santa_Cruz_70.3.fit")
df = parse_fitfile(filepath, cols, verbose=False)
df = calculate_grade(df)
df = calculate_speed_diff(df)
df = calculate_dt(df)

df.drop(labels=[0], inplace=True)

# Get rid of any rows where there is no change in time, or a long pause.
df = df[df.dt == 1]
# # Get rid of rows where speed is zero.
df = df[df.speed > 1]

x, residuals, rank, s, y_hat, b, df2 = estimate_parameters_twostep(df, verbose=True)
L = 1 - x[0]
Crr = x[1]
CdA = x[2]

print(f"Optimized parameters:")
print(f"* L = {L:.4f}")
print(f"* Crr = {Crr:.4f}")
print(f"* CdA = {CdA:.4f}")
print(f"* Residuals = {residuals[0]:.4f}")
print(f"* Rank = {rank}")
print(f"* Singular values = {s}")

In [None]:
fig = px.line(df2, x="timestamp", y=["error"], title="Actual vs. Predicted Acceleration")
fig.show()

In [None]:
fig = px.line(df2, x="timestamp", y=["speed", "altitude", "power"], title="Variables")
fig.show()

In [None]:
fig = px.histogram(df2, x="abs_error", title="Error Distribution")
fig.show()

np.percentile(df2.abs_error, 95)