<a href="https://colab.research.google.com/github/pascalghanimi/Ski-Classification-AI/blob/main/Validate_Classifier_Timing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [21]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.preprocessing import StandardScaler, LabelEncoder
import pickle

# ─── Attention Klasse definieren ───────────────────────────────────────────────
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.attn = nn.Linear(hidden_size * 4, hidden_size)  # Korrigierte Dimension
        self.v = nn.Linear(hidden_size, 1, bias=False)

    def forward(self, hidden, outputs):
        hidden = hidden.unsqueeze(1).repeat(1, outputs.size(1), 1)
        combined = torch.cat((hidden, outputs), dim=2)
        energy = torch.tanh(self.attn(combined))
        attention = torch.softmax(self.v(energy).squeeze(2), dim=1)
        return torch.sum(attention.unsqueeze(2) * outputs, dim=1)

# ─── Modellklasse definieren ───────────────────────────────────────────────────
class SkiSwingLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size,
            hidden_size,
            num_layers=2,
            bidirectional=True,
            batch_first=True,
            dropout=0.4
        )
        self.attention = Attention(hidden_size)
        self.dropout = nn.Dropout(0.6)
        self.fc = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, x):
        outputs, (hidden, _) = self.lstm(x)
        hidden_combined = torch.cat((hidden[-2], hidden[-1]), dim=1)
        context = self.attention(hidden_combined, outputs)
        return self.fc(self.dropout(context))

# ─── Konfiguration ────────────────────────────────────────────────────────────
MAX_LENGTH = 400
FEATURE_FILE = "PHALP_AKatja_EKK_features.pkl"
SCALER_FILE  = "scaler_schwung.pkl"
ENCODER_FILE = "encoder_schwung.pkl"
INPUT_VIDEO = "PHALP_AAaron_1.mp4"
OUTPUT_VIDEO = "annotated_output.mp4"
MODEL = "ski_schwung_classifier.pt"

# ─── Scaler & Encoder laden ───────────────────────────────────────────────────
with open(SCALER_FILE, "rb") as f:
    scaler = pickle.load(f)

with open(ENCODER_FILE, "rb") as f:
    le = pickle.load(f)

# ─── Features laden ───────────────────────────────────────────────────────────
with open(FEATURE_FILE, "rb") as f:
    features = pickle.load(f)

# ─── Daten vorbereiten ────────────────────────────────────────────────────────
data = []
frames = sorted(f for f in features["COM_to_ground"] if isinstance(f, int))

for frame in frames:
    frame_features = [
        features["COM_to_ground"][frame],
        features["knee_angles_right"][frame],
        features["knee_angles_left"][frame]
    ]

    # Füge joint_angles hinzu
    for axis in features["joint_angles"][frame]:
        frame_features.extend(
            features["joint_angles"][frame][axis].values()
        )

    # Füge axis_angles hinzu
    for axis in features["axis_angles"][frame]:
        frame_features.extend(
            features["axis_angles"][frame][axis].values()
        )

    # Füge COM_angles hinzu
    frame_features.extend(features["COM_angles"][frame].values())

    data.append(frame_features)

# Konvertiere zu numpy array
data = np.array(data, dtype=np.float32)

# Padden oder trimmen auf MAX_LENGTH
if data.shape[0] < MAX_LENGTH:
    pad = np.zeros((MAX_LENGTH - data.shape[0], data.shape[1]), dtype=np.float32)
    data = np.vstack([data, pad])
else:
    data = data[:MAX_LENGTH]

# Skalieren
data_scaled = scaler.transform(data)

# Zu Tensor konvertieren
x = torch.tensor(data_scaled, dtype=torch.float32).unsqueeze(0)  # [1, seq_len, feat_dim]

# ─── Modell laden ─────────────────────────────────────────────────────────────
# Füge sichere Klassen hinzu für das Laden
torch.serialization.add_safe_globals([
    SkiSwingLSTM,
    Attention,
    nn.LSTM,
    nn.Linear,
    nn.Dropout
])

# Modell laden
model = torch.load(MODEL, map_location="cpu")
model.eval()

# ─── Vorhersage machen ────────────────────────────────────────────────────────
with torch.no_grad():
    logits = model(x)
    style_idx = logits.argmax(dim=1).item()

# Klassennamen dekodieren
style_str = le.inverse_transform([style_idx])[0]
print("Vorhergesagte Schwungart:", style_str)

Vorhergesagte Schwungart: EKK


In [7]:
print(features["schwung_labels"])

{0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1, 10: 1, 11: 1, 12: 1, 13: 1, 14: 1, 15: 1, 16: 1, 17: 1, 18: 1, 19: 1, 20: 1, 21: 1, 22: 1, 23: 1, 24: 1, 25: 1, 26: 1, 27: 1, 28: 1, 29: 1, 30: 1, 31: 1, 32: 1, 33: 1, 34: 1, 35: 1, 36: 1, 37: 1, 38: 1, 39: 1, 40: 1, 41: 1, 42: 1, 43: 1, 44: 1, 45: 1, 46: 1, 47: 1, 48: 1, 49: 1, 50: 1, 51: 1, 52: 1, 53: 1, 54: 1, 55: 1, 56: 1, 57: 1, 58: 1, 59: 1, 60: 1, 61: 1, 62: 1, 63: 1, 64: 1, 65: 1, 66: 1, 67: 1, 68: 1, 69: 1, 70: 1, 71: 1, 72: 1, 73: 1, 74: 1, 75: 1, 76: 1, 77: 1, 78: 1, 79: 1, 80: 1, 81: 1, 82: 1, 83: 1, 84: 1, 85: 1, 86: 1, 87: 1, 88: 1, 89: 1, 90: 1, 91: 1, 92: 1, 93: 1, 94: 1, 95: 1, 96: 1, 97: 1, 98: 1, 99: 1, 100: 1, 101: 1, 102: 1, 103: 1, 104: 1, 105: 1, 106: 1, 107: 1, 108: 1, 109: 1, 110: 1, 111: 1, 112: 1, 113: 1, 114: 1, 115: 1, 116: 1, 117: 1, 118: 1, 119: 1, 120: 1, 121: 1, 122: 1, 123: 1, 124: 1, 125: 1, 126: 1, 127: 1, 128: 1, 129: 1, 130: 1, 131: 1, 132: 1, 133: 1, 134: 1, 135: 1, 136: 1, 137: 1, 138: 

In [22]:
import csv
import cv2

# ─── 2. Turn-Klassifikationsmodell ────────────────────────────────────────────
class FrameWiseLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size,
                            num_layers=2, bidirectional=True,
                            batch_first=True, dropout=0.3)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, x):
        out, _ = self.lstm(x)
        return self.fc(self.dropout(out))

# Füge sichere Klassen für Deserialisierung hinzu
torch.serialization.add_safe_globals([FrameWiseLSTM, nn.LSTM])

# ─── 3. Features laden und verarbeiten ────────────────────────────────────────
with open(FEATURE_FILE, "rb") as f:
    features = pickle.load(f)

# Extrahiere Frame-Indizes
frame_indices = sorted([f for f in features["COM_to_ground"].keys() if isinstance(f, int)])

# Baue die Sequenz für die Turn-Klassifikation auf
seq_turn = []
for i in frame_indices:
    try:
        v = [
            features["COM_to_ground"][i],
            features["knee_angles_right"][i],
            features["knee_angles_left"][i]
        ]

        # Füge joint_angles hinzu
        for axis in features["joint_angles"][i]:
            v.extend(features["joint_angles"][i][axis].values())

        # Füge axis_angles hinzu
        for axis in features["axis_angles"][i]:
            v.extend(features["axis_angles"][i][axis].values())

        # Füge COM_angles hinzu
        v.extend(features["COM_angles"][i].values())

        # Füge style_idx als zusätzliches Feature hinzu
        v.append(style_idx)

        seq_turn.append(v)
    except KeyError as e:
        print(f"Fehlendes Feature in Frame {i}: {e}")
        continue

if not seq_turn:
    raise RuntimeError("Turn-Sequence ist leer – bitte Features prüfen!")

# Konvertiere zu Tensor
arr_turn = np.array(seq_turn, dtype=np.float32)
x_turn = torch.tensor(arr_turn).unsqueeze(0).float()  # [1, n_frames, feat_dim+1]

# ─── 4. Modell laden und Vorhersage treffen ───────────────────────────────────
# Modell laden
model_turn = torch.load("left_right_classifier.pt", map_location="cpu")
model_turn.eval()

# Vorhersage treffen
with torch.no_grad():
    logits = model_turn(x_turn)
    preds = logits.argmax(dim=-1).squeeze().numpy()

# Labels erstellen
labels_turn = ["Linksschwung" if p == 1 else "Rechtsschwung" for p in preds]
print("Schwung pro Frame:", labels_turn)

# ─── 5. Video annotieren ──────────────────────────────────────────────────────
# Video öffnen
cap = cv2.VideoCapture(INPUT_VIDEO)
if not cap.isOpened():
    raise RuntimeError(f"Konnte Video {INPUT_VIDEO} nicht öffnen")

# Video-Writer einrichten
fps = cap.get(cv2.CAP_PROP_FPS)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(OUTPUT_VIDEO, fourcc, fps, (w, h))

# Frame-Zähler
frame_count = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Nur annotieren, wenn wir ein Label für diesen Frame haben
    if frame_count < len(labels_turn):
        swing_txt = labels_turn[frame_count]

        # Annotationen hinzufügen
        cv2.putText(frame, swing_txt, (50, 80),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
        cv2.putText(frame, f"Fahrstil: {style_str}", (50, h-30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255, 255, 255), 2)

    # Frame schreiben
    out.write(frame)
    frame_count += 1

# Ressourcen freigeben
cap.release()
out.release()
print(f"Annotiertes Video wurde gespeichert als '{OUTPUT_VIDEO}'")

Schwung pro Frame: ['Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linkssch

In [None]:
print(features["schwung_labels"])

In [23]:
print(preds)
print(features["schwung_labels"].values())

[2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 2 2 2 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 2 2 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 2 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
 2]
dict_values([2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])


In [None]:
abweichung = 0
schwünge = 0
prevValue = 0

for index, real_val in features["schwung_labels"].items():
  if (prevValue == real_val):
    print("passt")
  else:
    schwünge += 1
    if (real_val == preds[index]):
      # wenn der echte Wert der prediction entspricht, dann war Schwungwechsel entweder gleich oder zuvor von Modell erkannt
      counter = index
      while counter > 0:
        if (preds[counter] != real_val):
          # wenn der vorherige Wert sich vom momentanten unterscheidet, dann hat das Modell den Schwungwechsel auch genau auf diesen Frame vorhergesagt (Modell & Realität gleich)
          break
        else:
          abweichung += 1
          counter -= 1
    else:
      # in diesem Fall entspricht der reale Wert nicht dem Wert der Prediction, d.h. die Prediction ändert ihren Wert erst zu einem späteren Wert
      counter = index
      while counter <= len(preds):
        if (preds[counter] == real_val):
          # sobald die prediction mit dem realen Wert übereinstimmt, bricht man aus der Schleife
          break
        else:
          # solange der prediction Wert nicht mit dem realen übereinstimmt, wird die Abweichung erhöht und der counter ebenfalls
          abweichung += 1
          counter += 1
  prevValue = real_val

print(abweichung)
print(schwünge)
print("Durchschnittliche Abweichung in Frames: ", abweichung/schwünge)
print("Durchschnittliche Abweichung in Sekunden: ", (abweichung/schwünge)/25)

In [49]:
from glob import glob
import csv
import cv2

gesamtabweichung = 0
gesamtschwünge = 0

# ─── 2. Turn-Klassifikationsmodell ────────────────────────────────────────────
class FrameWiseLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size,
                            num_layers=2, bidirectional=True,
                            batch_first=True, dropout=0.3)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, x):
        out, _ = self.lstm(x)
        return self.fc(self.dropout(out))

# Modell laden
model_turn = torch.load("left_right_classifier.pt", map_location="cpu")
model_turn.eval()


files = glob("*features.pkl")

for file in files:
  with open(file, "rb") as f:
    features = pickle.load(f)

  # Extrahiere Frame-Indizes
  frame_indices = sorted([f for f in features["COM_to_ground"].keys() if isinstance(f, int)])

  # Baue die Sequenz für die Turn-Klassifikation auf
  seq_turn = []
  for i in frame_indices:
      try:
          v = [
              features["COM_to_ground"][i],
              features["knee_angles_right"][i],
              features["knee_angles_left"][i]
          ]

          # Füge joint_angles hinzu
          for axis in features["joint_angles"][i]:
              v.extend(features["joint_angles"][i][axis].values())

          # Füge axis_angles hinzu
          for axis in features["axis_angles"][i]:
              v.extend(features["axis_angles"][i][axis].values())

          # Füge COM_angles hinzu
          v.extend(features["COM_angles"][i].values())

          # Füge style_idx als zusätzliches Feature hinzu
          v.append(style_idx)

          seq_turn.append(v)
      except KeyError as e:
          print(f"Fehlendes Feature in Frame {i}: {e}")
          continue

  if not seq_turn:
      raise RuntimeError("Turn-Sequence ist leer – bitte Features prüfen!")

  # Konvertiere zu Tensor
  arr_turn = np.array(seq_turn, dtype=np.float32)
  x_turn = torch.tensor(arr_turn).unsqueeze(0).float()  # [1, n_frames, feat_dim+1]

  # Vorhersage treffen
  with torch.no_grad():
      logits = model_turn(x_turn)
      preds = logits.argmax(dim=-1).squeeze().numpy()

  # Labels erstellen
  labels_turn = ["Linksschwung" if p == 1 else "Rechtsschwung" for p in preds]
  print("Schwung pro Frame:", labels_turn)




  abweichung = 0
  schwünge = 0
  prevValue = 0

  for index, real_val in features["schwung_labels"].items():
    if (prevValue == real_val):
      continue
    else:
      schwünge += 1
      if (real_val == preds[index]):
        # wenn der echte Wert der prediction entspricht, dann war Schwungwechsel entweder gleich oder zuvor von Modell erkannt
        counter = index
        while counter > 0:
          if (preds[counter] != real_val):
            # wenn der vorherige Wert sich vom momentanten unterscheidet, dann hat das Modell den Schwungwechsel auch genau auf diesen Frame vorhergesagt (Modell & Realität gleich)
            break
          else:
            abweichung += 1
            counter -= 1
      else:
        # in diesem Fall entspricht der reale Wert nicht dem Wert der Prediction, d.h. die Prediction ändert ihren Wert erst zu einem späteren Wert
        counter = index
        while counter <= len(preds):
          if (preds[counter] == real_val):
            # sobald die prediction mit dem realen Wert übereinstimmt, bricht man aus der Schleife
            break
          else:
            # solange der prediction Wert nicht mit dem realen übereinstimmt, wird die Abweichung erhöht und der counter ebenfalls
            abweichung += 1
            counter += 1
    prevValue = real_val

  print(abweichung)
  print(schwünge)
  print("Durchschnittliche Abweichung in Frames: ", abweichung/schwünge)
  print("Durchschnittliche Abweichung in Sekunden: ", (abweichung/schwünge)/25)

  gesamtschwünge += schwünge
  gesamtabweichung += abweichung


print("Durchschnittliche Gesamtabweichung in Frames: ", gesamtabweichung/gesamtschwünge)
print("Durchschnittliche Gesamtabweichung in Sekunden: ", (gesamtabweichung/gesamtschwünge)/25)

Schwung pro Frame: ['Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Linksschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung', 'Rechtsschwung',