Reference: https://www.kaggle.com/code/hzning/0-81-easy-is-all-you-need

In [19]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# import polar as pl
import torch.nn.functional as F
import torch.nn as nn
import torch
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from scipy.spatial.transform import Rotation as R
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import LabelEncoder, StandardScaler

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Configures
FOLDS = 5
LR_INIT = 3e-3
WD = 3e-5

## 1. 📈 IMU and Gyro feature extraction

In [2]:
class ImuExtractor(nn.Module):
    def __init__(self):
        super().__init__()

        k = 15  # todo try to change this
        self.lpf_acc = nn.Conv1d(3, 3, k, padding="same", groups=3, bias=False)
        self.lpf_gyro = nn.Conv1d(3, 3, k, padding="same", groups=3, bias=False)

    def forward(self, imu: torch.Tensor):
        B, C, T = imu.shape

        acc = imu[:, :3, :]  # acc_x, acc_y, acc_z
        gyro = imu[:, 3:6, :]  # gyro_x, gyro_y, gyro_z
        extra = imu[:, 6:, :]

        # 1) Magnitude
        acc_mag = torch.norm(acc, dim=1, keepdim=True) # todo try bias=True as author did
        gyro_mag = torch.norm(gyro, dim=1, keepdim=True)

        # 2) Jerk
        jerk = F.pad(acc_mag.diff(), (1, 0))
        gyro_delta = F.pad(gyro_mag.diff(), (1, 0))

        # 3) energy # todo try without this
        acc_pow = acc**2
        gyro_pow = gyro**2

        # LPF / HPF
        acc_lpf = self.lpf_acc(acc)
        acc_hpf = acc - acc_lpf
        gyro_lpf = self.lpf_gyro(gyro)
        gyro_hpf = gyro - gyro_lpf

        # fmt: off
        features = [
            acc, gyro,
            jerk, gyro_delta,
            acc_pow, gyro_pow,
            acc_lpf, acc_hpf, gyro_lpf, gyro_hpf,
        ]
        # fmt: on

        return torch.cat(features, dim=1)

dummy_inp = torch.randn((64, 6, 40))
ImuExtractor()(dummy_inp).shape

torch.Size([64, 26, 40])

## 2. 🧠 Building Blocks

In [3]:
class SEBlock(nn.Module):
    def __init__(self, c_in, r):
        super().__init__()

        self.squeeze = nn.AdaptiveAvgPool1d(1)
        self.excitation = nn.Sequential(
            nn.Linear(c_in, c_in // r, bias=False),  # todo try with bias
            nn.ReLU(inplace=True),
            nn.Linear(c_in // r, c_in, bias=False),  # todo try with bias
            nn.Sigmoid(),
        )

    def forward(self, x: torch.Tensor):
        squeezed = self.squeeze(x).squeeze(-1)  # B x C
        weights = self.excitation(squeezed).unsqueeze(-1)  # B x C x 1

        return weights * x


SEBlock(6, 8)(dummy_inp).shape



torch.Size([64, 6, 40])

## 🥟 Baseline Network

In [None]:
class BaseNet(nn.Module):
    def __init__(self, c_in, n_classes):
        super().__init__()

        # fmt: off
        self.cnn_layer = nn.Sequential(
            nn.Conv1d(c_in, 32, 3, padding="same"),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.MaxPool1d(4),
            nn.Dropout(0.1),
            
            nn.Conv1d(32, 64, 3, padding="same"),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Dropout(0.1),
        )
        
        self.pool_layer = nn.AdaptiveAvgPool1d(1)
        
        self.fc_layer = nn.Sequential(
            nn.Linear(64, n_classes),
        )
        # fmt: on

    def forward(self, x: torch.Tensor):
        x = self.cnn_layer(x)
        x = self.pool_layer(x).squeeze(-1)
        x = self.fc_layer(x)
        return x


BaseNet(c_in=dummy_inp.shape[1], n_classes=5)(dummy_inp).shape

torch.Size([64, 5])

## 🧹 Data Cleaning

## 👷 Feature Engineering

In [5]:
dummy_df = pd.read_csv("../data/raw/train.csv")
dummy_demographs_df = pd.read_csv("../data/raw/train_demographics.csv")
df = pd.merge(
    dummy_df,
    dummy_demographs_df,
    how="left",
    on="subject",
)

df.drop(columns=[col for col in df.columns if col.startswith("tof_")], inplace=True)

In [28]:
def get_linear_acc(df: pd.DataFrame):
    acc = df[["acc_x", "acc_y", "acc_z"]].values
    quats = df[["rot_x", "rot_y", "rot_z", "rot_w"]].values

    zero_quat_mask = np.isclose(quats, 0).all(axis=1)
    quats[zero_quat_mask] = np.array([0, 0, 0, 1])

    rotation = R.from_quat(quats)
    gravity_world = np.array([0, 0, 9.81])
    gravity_sensor = rotation.inv().apply(gravity_world)
    linear_acc = acc - gravity_sensor

    result_df = pd.DataFrame(linear_acc, columns=["linear_acc_x", "linear_acc_y", "linear_acc_z"], index=df.index)
    return result_df


def feature_engineer(df: pd.DataFrame, inplace=False):
    if not inplace:
        df = df.copy()
    
    df = df.ffill().bfill().fillna(0)
    df["acc_mag"] = np.linalg.norm(df[["acc_x", "acc_y", "acc_z"]], axis=1)
    df["rot_angle"] = 2 * np.arccos(df["rot_w"]).clip(-1, 1)
    df["acc_mag_jerk"] = df.groupby("sequence_id")["acc_mag"].diff().fillna(0)
    df["rot_angle_vel"] = df.groupby("sequence_id")["rot_angle"].diff().fillna(0)

    # Linear Acceleration
    df.drop(columns=["linear_acc_x", "linear_acc_y", "linear_acc_z"], errors="ignore", inplace=True)
    linear_acc_df = df.groupby("sequence_id").apply(get_linear_acc).droplevel(0)
    df = df.join(linear_acc_df)
    df["linear_acc_mag"] = np.linalg.norm(df[["linear_acc_x", "linear_acc_y", "linear_acc_z"]].values, axis=1)
    df[["linear_jerk_x", "linear_jerk_y", "linear_jerk_z"]] = df.groupby("sequence_id")[["linear_acc_x", "linear_acc_y", "linear_acc_z"]].diff().fillna(0)
    
    # Angular Distance
    # Angular Velocity
    
    return df

In [None]:
rot = cleaned_df[['rot_x', 'rot_y', 'rot_z', 'rot_w']].values
rotation = R.from_quat(rot)
# rotation.to_euler

Rotation.from_matrix(array([[[-0.71159303,  0.53540577,  0.45494618],
                             [ 0.10008689, -0.56367398,  0.81991113],
                             [ 0.69542647,  0.62897719,  0.34751937]],
                     
                            [[-0.72723823,  0.52836582,  0.43812569],
                             [ 0.05508883, -0.59131937,  0.80455368],
                             [ 0.68417087,  0.60923802,  0.400923  ]],
                     
                            [[-0.75361443,  0.57504313,  0.31841905],
                             [-0.18349828, -0.64921224,  0.7381408 ],
                             [ 0.63118435,  0.49784421,  0.59477514]],
                     
                            ...,
                     
                            [[ 0.51943721,  0.83472896,  0.182791  ],
                             [ 0.75953171, -0.54902959,  0.34882388],
                             [ 0.39153106, -0.04235654, -0.91918951]],
                     
             

## 📁 Data Setup

In [None]:
cleaned_df = feature_engineer(cleaned_df)

  linear_acc_df = df.groupby("sequence_id").apply(get_linear_acc).droplevel(0)


In [36]:
# fmt: off
FEATURE_NAMES = [
    'acc_x', 'acc_y', 'acc_z',
    'linear_acc_x', 'linear_acc_y', 'linear_acc_z',
    'rot_x', 'rot_y', 'rot_z', 'rot_w',
]
# fmt: on

le = LabelEncoder()
cleaned_df['gesture_int'] = le.fit_transform(cleaned_df['gesture'])

scalar = StandardScaler()
scaler = scalar.fit(cleaned_df[FEATURE_NAMES].values)

## 🏋️ Train

In [None]:
epochs = 50
criterion = nn.CrossEntropyLoss()

skf = StratifiedKFold(n_splits=FOLDS, shuffle=True)

for fold, (train_indices, test_indices) in enumerate(skf.split(uhhh)):
    train_loader = DataLoader()
    model = BaseNet(C, n_classes)
    opt = optim.Adam(model.parameters(), lr=LR_INIT, weight_decay=WD)
    
    for epoch in range(epochs):
        avg_loss = 0
        for x, y in train_loader:
            x = x.to(device)
            y = y.to(device)
            
            y_pred = model(x)
            loss = criterion(y_pred, y)
            avg_loss += loss
            
            opt.zero_grad()
            loss.backward()
            opt.step()
            
        avg_loss /= len(train_loader)
        print(f"{epoch} - avg loss {avg_loss}")