In [50]:
import pandas as pd
import os
from glob import glob

def load(pose_path: str, labels_path: str):
    """
    load data from csv files in the given path
    """
    # get labels
    df_labels = pd.read_csv(labels_path)

    # Get all CSV files in the folder
    csv_files = [ p for p in glob(os.path.join(pose_path, '*.csv'))]
    movie_names = [n.removeprefix(pose_path).removeprefix('/').removesuffix('_clicked.csv') + ".mp4" for n in csv_files]

    # filter out files that are not in the labels
    df_labels = df_labels[df_labels['movieName'].isin(movie_names)]

    # get labels for the movies
    movie_labels_dict = df_labels.set_index('movieName')['SKIER_LEVEL'].to_dict()
    movie_labels = [movie_labels_dict.get(n, 'Unknown') for n in movie_names]

    # Load and concatenate all CSV files into one DataFrame
    df = pd.concat(
        [pd.read_csv(f).assign(movie_name=n, style=l) for f,n,l in zip(csv_files, movie_names, movie_labels)],
        ignore_index=True
    )
    return df

df = load('../pose_outputs_clicked', '../data/labeledFilms.csv')
print(df.head())


             0           1            2           3            4           5  \
0  1254.821777  266.308258  1253.617310  263.670959  1254.648926  263.740295   
1  1263.405518  266.933228  1261.811890  264.107697  1263.873291  264.101013   
2  1267.838501  269.231659  1269.913574  266.479858  1265.309082  267.033691   
3  1261.708252  270.450470  1261.983398  267.829529  1263.558594  268.043274   
4  1274.811523  275.577850  1272.380371  272.843079  1274.750732  272.592285   

             6           7            8           9  ...           26  \
0  1250.108398  265.144958  1258.710938  265.316528  ...  1247.501587   
1  1257.015259  266.034546  1269.713257  265.880768  ...  1256.756592   
2  1273.361572  267.323059  1262.130127  268.485596  ...  1285.250122   
3  1264.280518  269.696411  1274.004517  268.953156  ...  1267.253418   
4  1264.942627  274.395477  1277.333008  274.184326  ...  1269.195312   

           27           28          29           30          31           32  \


# LSTM
## Trening

In [52]:
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn

# load the data
df = load('../pose_outputs_clicked', '../data/labeledFilms.csv')
df['style'] = df['style'].astype(str)  # ensure it's str (for label encoding)

# Prepare the data
SEQUENCE_LENGTH = 40  # truncate or pad to this length
FEATURE_SIZE = len(df.columns) - 2  # exclude 'style' and 'movie_name'
sequences = []
labels = []

for movie_name, group in df.groupby("movie_name"):
    group = group.drop(columns=["movie_name"])  # keep only keypoints + label
    label = group['style'].iloc[0]  # assume label same for the whole clip
    keypoints = group.drop(columns=['style']).values.astype(np.float32)

    # Truncate or pad
    if len(keypoints) >= SEQUENCE_LENGTH:
        keypoints = keypoints[:SEQUENCE_LENGTH]
    else:
        pad_len = SEQUENCE_LENGTH - len(keypoints)
        padding = np.zeros((pad_len, keypoints.shape[1]), dtype=np.float32)
        keypoints = np.vstack((keypoints, padding))

    sequences.append(keypoints)
    labels.append(label)

# Encode labels
le = LabelEncoder()
y = le.fit_transform(labels)
X = np.array(sequences, dtype=np.float32)
y = np.array(y, dtype=np.int64)
print(f"X shape: {X.shape}, y shape: {y.shape}")  # (n_clips, seq_len, features)

# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, stratify=y)

class PoseSequenceDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X)
        self.y = torch.tensor(y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_loader = DataLoader(PoseSequenceDataset(X_train, y_train), batch_size=32, shuffle=True)
val_loader = DataLoader(PoseSequenceDataset(X_val, y_val), batch_size=32)

class LSTMPoseClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, num_layers=1):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = out[:, -1, :]  # last time step
        return self.fc(out)

# Initialize the model, loss function, and optimizer
input_size = X.shape[2]
model = LSTMPoseClassifier(input_size=input_size, hidden_size=64, num_classes=len(le.classes_))
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10):
    model.train()
    for X_batch, y_batch in train_loader:
        preds = model(X_batch)
        loss = criterion(preds, y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")


X shape: (2, 40, 34), y shape: (2,)
Epoch 1, Loss: 0.0000
Epoch 2, Loss: 0.0000
Epoch 3, Loss: 0.0000
Epoch 4, Loss: 0.0000
Epoch 5, Loss: 0.0000
Epoch 6, Loss: 0.0000
Epoch 7, Loss: 0.0000
Epoch 8, Loss: 0.0000
Epoch 9, Loss: 0.0000
Epoch 10, Loss: 0.0000


## Ewaluacja

In [44]:
model.eval()
correct = total = 0
with torch.no_grad():
    for X_batch, y_batch in val_loader:
        preds = model(X_batch)
        predicted = preds.argmax(dim=1)
        correct += (predicted == y_batch).sum().item()
        total += y_batch.size(0)

print("Validation Accuracy:", correct / total)

Validation Accuracy: 1.0


# DTW + k-NN
## Train

In [57]:
from tslearn.metrics import dtw
from tslearn.neighbors import KNeighborsTimeSeriesClassifier

# Suppose each sample is a (T, D) array: T=timesteps, D=features (e.g. 34 keypoints)
# You need to group rows per movie into a sequence
X_sequences = []
y = []

for movie_name, group in df.groupby("movie_name"):
    coords = group.drop(columns=["movie_name", "style"]).values  # shape (T, D)
    X_sequences.append(coords)
    y.append(group["style"].iloc[0])

# Convert to proper time series format (num_samples, T, D)
from tslearn.utils import to_time_series_dataset
X_ts = to_time_series_dataset(X_sequences)  # handles padding internally

# Train a classifier
knn = KNeighborsTimeSeriesClassifier(n_neighbors=1, metric="dtw")
knn.fit(X_ts, y)

# Predict on a new clip
new_clip_sequence = np.random.rand(SEQUENCE_LENGTH, FEATURE_SIZE).astype(np.float32)  # Example new clip
y_pred = knn.predict([new_clip_sequence])
print("Predicted label for new clip:", y_pred)

Predicted label for new clip: ['intermediate']


## Evaluate

In [None]:
# TODO