In [None]:
import pandas as pd
from load_ulg import load_ulg, timeframe
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression, HuberRegressor
import scipy
model_name = "x500"
if model_name == 'x500':
    from geometry import x500 as model
else:
    from geometry import race6 as model
from process import throttle_thrust, find_tau, fit_tau, plot_thrust_curve, torque_angular_acceleration, fit_inertia, plot_torque_angular_acceleration_curve
import numpy as np
import ipywidgets as widgets
%matplotlib widget

In [None]:
if model_name == "x500":
    output_topic = "actuator_motors_mux" # use "actuator_controls" for the unmodified PX4
else:
    output_topic = "actuator_motors" # use "actuator_controls" for the unmodified PX4

### Loading the logfile into a Pandas dataframe

In [None]:
if model_name == 'x500':
    ulog_files = [
        # "logs/test.ulg",
        # "logs/test2.ulg"
        "logs/2023-12-27-x500/log_36_2023-12-27-14-21-32.ulg",
        "logs/2023-12-27-x500/log_37_2023-12-27-14-24-24.ulg"
    ]
else:
    ulog_files = [
        "logs/2023-12-13-nishanth-sysid/log_45_2023-12-13-21-07-46-up-and-down.ulg",
        "logs/2023-12-14-yang-crash/log_60_2023-12-14-20-26-16.ulg"
    ]
dfs = [load_ulg(file) for file in ulog_files]

### Exploring the datapoints in the dataframe

In [None]:
grep = "actuator_motors"
[col for col in dfs[0].columns if grep in col]

### Looking at the outputs to find the relevant timeframe

In [None]:
for i, (file, df) in enumerate(zip(ulog_files, dfs)):
    plt.figure()
    plt.title(f"file {i}: {file}")
    for i in range(4):
        plt.plot(df[f"{output_topic}_control[{i}]"].dropna(), label=f"motor {i}")
    plt.legend()

In [None]:
if model_name == "x500":
    timeframes = [
        # (30, 50),
        # (15, 50),
        (30, 50),
        (10, 40),
    ]
else:
    timeframes = [
        (4, 10),
        (7, 11),
    ]
dfs = [timeframe(df, time_start, time_end) for df, (time_start, time_end) in zip(dfs, timeframes)]

In [None]:
tau_test = 0.03
dfs_tt = [throttle_thrust(df, tau_test, model, output_topic) for df in dfs]
for i, (file, df) in enumerate(zip(ulog_files, dfs_tt)):
    plt.figure()
    plt.title(f"File {i}: {file} tau: {tau_test}")
    plt.plot(df["throttle"].dropna(), label="throttle")
    plt.ylabel("Throttle [0-1]^2")
    plt.xlabel("Time [s]")
    plt.twinx()
    plt.plot(df["thrust"].dropna(), label="acceleration", color="orange")
    plt.ylabel("Thrust [N]")
    plt.show()

In [None]:
# throttle_estimation_files = list(range(len(ulog_files)))
throttle_estimation_files = [0]
tau_correlations = find_tau([dfs[i] for i in throttle_estimation_files], model, output_topic)
tau_argmax = tau_correlations[:, 1].argmax()
tau = tau_correlations[tau_argmax, 0]
plt.plot(tau_correlations[:, 0], tau_correlations[:, 1])
plt.vlines(tau, tau_correlations[:, 1].min(), tau_correlations[:, 1].max(), color="red", label=f"Max Correlation Tau = {tau:.3f}s")
plt.xlabel("Tau [s]")
plt.ylabel("Correlation (throttle, thrust)")
plt.legend()
plt.show()

In [None]:
dfs_tt = [throttle_thrust(df, tau, model, output_topic) for df in [dfs[i] for i in throttle_estimation_files]]

In [None]:
percentile = 0.05

df_tt = pd.concat(dfs_tt)
df_sysid = df_tt[["thrust", "throttle"]].dropna()
thrust = df_sysid["thrust"]
throttle = df_sysid["throttle"]

acceleration = thrust / model.mass
real_acceleration = (acceleration - model.gravity).abs()
real_acceleration_percentile = real_acceleration.quantile(percentile)
hovering_throttles = (throttle[real_acceleration < real_acceleration_percentile]/4) ** 0.5
hovering_throttle = hovering_throttles.median()
plt.title("Hovering Throttle Distribution")
counts, bin_edges, patches = plt.hist(hovering_throttles, bins=100)
plt.vlines(hovering_throttle, 0, max(counts), color="red", label=f"Median ({hovering_throttle:.3f})")
plt.xlabel("Hovering Throttle [0-1]")
plt.legend()
plt.show()
print(f"Hovering throttle: {hovering_throttle} (per motor)")

In [None]:

correlation, (slope, intercept) = fit_tau(dfs_tt, tau, model, output_topic)
plot_thrust_curve(df_tt, model, output_topic, tau, slope, intercept, hovering_throttle)

In [None]:
dfs_tac = [torque_angular_acceleration(df, model, output_topic, tau, slope, intercept) for df in dfs]
df_tac = pd.concat(dfs_tac)
tac_correlation = df_tac["torque_x"].corr(df_tac["dw_x"])
print(f"Correlation: {tac_correlation}")
for i, (file, df_tt_tac) in enumerate(zip(ulog_files, dfs_tac)):

    plt.figure()
    plt.title(f"File {i}: {file}")
    plt.plot(df_tt_tac["vehicle_angular_velocity_xyz_derivative[0]"].dropna(), label="vehicle_angular_velocity_xyz_derivative[0]")
    plt.plot(df_tt_tac["dw_x"].dropna(), label="dw_x")
    plt.show()

    plt.figure()
    plt.title(f"File {i}: {file}")
    plt.plot(df_tt_tac["dw_x"].dropna(), label="dw_x")
    plt.legend()
    plt.twinx()
    plt.plot(df_tt_tac["torque_x"].dropna(), label="torque_x", color="red", linestyle="--")
    plt.legend()
    plt.show()
    plt.figure()

In [None]:
# angular_acceleration_files = list(range(len(ulog_files)))
inertia_estimation_files = [1] 
I_x, I_y = fit_inertia(dfs, model, output_topic, tau, slope, intercept)
plot_torque_angular_acceleration_curve([dfs[i] for i in inertia_estimation_files], model, output_topic, tau, slope, intercept)

### Load Inertias from Other Quadrotors to estimate Izz

In [None]:
df_inertias = pd.read_csv("quadrotor_inertias.csv")

In [None]:
df_inertias["i_zz_ratio_mean"] = df_inertias["i_zz"] * 2 / (df_inertias["i_xx"] + df_inertias["i_yy"])
df_inertias["i_zz_ratio_max"] = df_inertias["i_zz"] / (df_inertias[["i_xx", "i_yy"]].max(axis=1))
df_inertias

In [None]:
counts, bin_edges, patches = plt.hist(df_inertias["i_zz_ratio_mean"], bins=10)
plt.title("Roll/Pitch (mean) -> Yaw Inertia Ratio Distribution")
plt.vlines(df_inertias["i_zz_ratio_mean"].median(), 0, max(counts), color="red", label=f"Median ({df_inertias['i_zz_ratio_mean'].median():.3f})")
plt.vlines(df_inertias["i_zz_ratio_mean"].mean(), 0, max(counts), color="black", label=f"Mean ({df_inertias['i_zz_ratio_mean'].mean():.3f})")
plt.legend()
plt.show()
plt.title("Roll/Pitch (max) -> Yaw Inertia Ratio Distribution")
plt.hist(df_inertias["i_zz_ratio_max"], bins=10)
plt.vlines(df_inertias["i_zz_ratio_max"].median(), 0, max(counts), color="red", label=f"Median ({df_inertias['i_zz_ratio_max'].median():.3f})")
plt.vlines(df_inertias["i_zz_ratio_max"].mean(), 0, max(counts), color="black", label=f"Mean ({df_inertias['i_zz_ratio_max'].mean():.3f})")
plt.legend()
plt.show()

In [None]:
std_mean = df_inertias["i_zz_ratio_mean"].std()
std_max = df_inertias["i_zz_ratio_max"].std()
print(f"Predictive standard deviation: mean: {std_mean:.3f} max: {std_max:.3f}")
use_mean = std_mean < std_max
if use_mean:
    inertia_ratio = df_inertias["i_zz_ratio_mean"].median()
    print(f"Using mean: {inertia_ratio:.3f}")
else:
    inertia_ratio = df_inertias["i_zz_ratio_max"].median()
    print(f"Using max, inertia ratio: {inertia_ratio:.3f}")
I_z = (I_x + I_y)/2 * inertia_ratio if use_mean else max(I_x, I_y) * inertia_ratio
print(f"Estimated I_z: {I_z:.3f}")
I = np.diag([I_x, I_y, I_z])
I

In [None]:
z_inertia_estimation_files = [1]
dfs_tac = [torque_angular_acceleration(df, model, output_topic, tau, slope, intercept) for df in [dfs[i] for i in z_inertia_estimation_files]]

In [None]:
df_orig = dfs_tac[0]
selected_cols = [*[f"{output_topic}_control[{i}]" for i in range(4)], *[f"torque_{i}" for i in ["x", "y", "z"]], *[f"dw_{i}" for i in ["x", "y", "z"]]]
df = df_orig[selected_cols].dropna(how="all").copy()
old_index = df.index
df.index = pd.to_datetime(df.index, unit="s")
df = df.interpolate(method="time")
df.index = old_index
df = df.dropna()
throttle2thrust = lambda throttle: throttle ** 2 * slope + intercept / 4
throttle2draginput = lambda i, throttle: throttle2thrust(throttle) * model.rotor_torque_directions[i]
for motor_i in range(4):
    for axis_i, axis in enumerate(["x", "y", "z"]):
        df[f"torque_drag_input[{motor_i}]_{axis}"] = df[f"{output_topic}_control[{motor_i}]"].ewm(halflife=f"{tau*np.log(2)} s", times=pd.to_datetime(df.index, unit="s")).mean().map(lambda throttle: throttle2draginput(motor_i, throttle)[axis_i])

In [None]:
thrust_angular_acceleration_z = df["torque_z"] / I_z
residual_angular_acceleration_z = (-df["dw_z"]) - thrust_angular_acceleration # convert dw_z from FRD to FLU
residual_torque_z = residual_angular_acceleration_z * I_z

In [None]:
drag_input_z = sum([df[f"torque_drag_input[{motor_i}]_z"] for motor_i in range(4)])

model = LinearRegression()
print(f"Correlation {drag_input_z.corr(residual_torque_z)}")
model.fit(drag_input_z.values.reshape(-1, 1), residual_torque_z.values)
Kd, Kd_intercept = (model.coef_[0], model.intercept_)

x = np.linspace(drag_input_z.min(), drag_input_z.max(), 100)
y = x * Kd + Kd_intercept
fig, ax = plt.subplots()
plt.scatter(drag_input, residual_torque_z)
line, = plt.plot(x, y, color="red", label=f"Kd = {Kd:.3f}")
plt.ylabel("Residual Torque [Nm]")
plt.xlabel("Drag Input [N]")
plt.title("Thrust -> Torque (Drag) Curve")
plt.legend()


slider = widgets.FloatSlider(
    value=Kd,
    min=0,
    max=1,
    step=0.01,
    description='Kd'
)

display(slider)

def update_plot(change):
    global Kd
    Kd = change.new
    line.set_ydata(x * change.new + Kd_intercept)
    line.set_label(f"Kd = {change.new:.3f}")
    fig.canvas.draw_idle()
    ax.legend()

slider.observe(update_plot, names='value')

In [None]:
print(f"Kf[0]: {intercept/4:.5f}")
print(f"Kf[2]: {slope:.5f}")
print(f"Kd: {Kd:.5f}")
print(f"Ixx: {I_x:.5f}")
print(f"Iyy: {I_y:.5f}")
print(f"Izz: {I_z:.5f}")