In [1]:
import os
import cv2
import tqdm
import pickle
import numpy as np
from ultralytics import YOLO
import matplotlib.pyplot as plt
from utils.dataset_utils import *
from collections import defaultdict
from deep_sort_realtime.deepsort_tracker import DeepSort

In [2]:
video = "video_8min"
num_frames=1
total_detections=33

In [3]:
raw_video_folder = r'..\data\raw\videos'
video_path = raw_video_folder + "\\" + video + ".mp4"

yolo_path = r'..\models\costumized_yolo\costumized_yolo\costumized_yolo.pt'
processed_video_folder = rf'..\data\processed\{video}'

In [4]:
model = YOLO(yolo_path)
tracker = DeepSort(max_age=30)
cap = cv2.VideoCapture(video_path)
total_frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

  import pkg_resources


In [5]:
# total_frames
os.makedirs(os.path.join(processed_video_folder, "total_frames"), exist_ok=True)
tf_path = os.path.join(processed_video_folder, "total_frames", "total_frames.pkl")

if os.path.exists(tf_path):
    with open(tf_path, "rb") as f:
        total_frames = pickle.load(f)
else:
    total_frames = []
    for frame in tqdm.tqdm(range(total_frame_count), desc="Processing frames"):
        frame_records = process_frame(cap, model, tracker, frame)
        total_frames.extend(frame_records)
    cap.release()
    with open(tf_path, "wb") as f:
        pickle.dump(total_frames, f)

In [6]:
# filtered_frames
os.makedirs(os.path.join(processed_video_folder, "filtered_frames"), exist_ok=True)
ff_path = os.path.join(processed_video_folder, "filtered_frames", "filtered_frames.pkl")
ms_path = os.path.join(processed_video_folder, "filtered_frames", "max_speed.pkl")

if os.path.exists(ff_path) and os.path.exists(ms_path):
    with open(ff_path, "rb") as f:
        filtered_frames = pickle.load(f)

    with open(ms_path, "rb") as f:
        max_speed = pickle.load(f)
else:
    filtered_frames, max_speed = filter_frames(total_frames)
    with open(ff_path, "wb") as f:
        pickle.dump(filtered_frames, f)

In [7]:
# full_track_windows
os.makedirs(os.path.join(processed_video_folder, "full_track_windows"), exist_ok=True)
ftw_path = os.path.join(processed_video_folder, "full_track_windows", f"full_track_windows_{total_detections}.pkl")
vw_path = os.path.join(processed_video_folder, "full_track_windows", f"valid_windows_{total_detections}.pkl")

if os.path.exists(ftw_path) and os.path.exists(vw_path):
    with open(ftw_path, "rb") as f:
        full_track_windows = pickle.load(f)

    with open(vw_path, "rb") as f:
        valid_windows = pickle.load(f)
else:
    full_track_windows, valid_windows = find_valid_windows(filtered_frames, num_frames=num_frames, total_detections=total_detections)
    with open(ftw_path, "wb") as f:
        pickle.dump(full_track_windows, f)

    with open(vw_path, "wb") as f:
        pickle.dump(valid_windows, f)

print(f"Found {len(valid_windows)} windows with {total_detections} continuous detections.")

Found 742 windows with 33 continuous detections.


In [8]:
def get_expert_features(frame, width, height, max_speed):    
    frame = sorted(frame, key=lambda det: det['label'] != '0') # sort so that Pred is always first

    vscale = np.vectorize(scale)

    xs = np.array([det['x'] for det in frame])
    ys = np.array([det['y'] for det in frame])
    scaled_xs = vscale(xs, 0, width, 0, 1)
    scaled_ys = vscale(ys, 0, height, 0, 1)

    vxs = np.array([det['vx'] for det in frame])
    vys = np.array([det['vy'] for det in frame])

    thetas = np.array([det['angle'] for det in frame])
    scaled_thetas = vscale(thetas, -np.pi, np.pi, -1, 1)

    speed = np.array([det['speed'] for det in frame])
    scaled_speed = vscale(speed, 0, max_speed, 0, 1)

    cos_t = np.cos(thetas)                        
    sin_t = np.sin(thetas)

    # pairwise distances
    dx = scaled_xs[None, :] - scaled_xs[:, None]
    dy = scaled_ys[None, :] - scaled_ys[:, None]

    # relative velocities
    rel_vx = cos_t[:, None] * vxs[None, :] + sin_t[:, None] * vys[None, :]
    rel_vy = -sin_t[:, None] * vxs[None, :] + cos_t[:, None] * vys[None, :]
    scaled_rel_vx = vscale(rel_vx, -max_speed, max_speed, -1, 1)
    scaled_rel_vy = vscale(rel_vy, -max_speed, max_speed, -1, 1)

    n = scaled_xs.shape[0]
    thetas_mat = np.tile(scaled_thetas[:, None], (1, n))
    speed_mat = np.tile(scaled_speed[:, None], (1, n))
    features = np.stack([dx, dy, scaled_rel_vx, scaled_rel_vy, speed_mat, thetas_mat], axis=-1)

    mask = ~np.eye(n, dtype=bool) # shape (N, N)
    neigh = features[mask].reshape(n, n-1, 6)

    pred_tensor = torch.from_numpy(neigh[0]).unsqueeze(0)
    prey_tensor = torch.from_numpy(neigh[1:]) # shape (N-1, N-1, 5)

    return pred_tensor, prey_tensor


def get_expert_tensors(full_track_windows, valid_windows, width, height, max_speed, window_size=9):
    start_frames = [vw['start_frame'] for vw in valid_windows]
    pred_windows = []
    prey_windows = []

    for idx, start in enumerate(start_frames):
        window_detections = []
        for frame in range(start, start + window_size):
            dets = [det for det in full_track_windows[idx] if det['frame'] == frame]
            window_detections.append(dets)

        preds = []
        preys = []
        for dets in window_detections:
            pred_tensor, prey_tensor = get_expert_features(dets, width, height, max_speed)
            preds.append(pred_tensor)
            preys.append(prey_tensor)

        pred_windows.append(torch.stack(preds, dim=0))
        prey_windows.append(torch.stack(preys, dim=0))

    pred_tensor = torch.stack(pred_windows, dim=0)
    prey_tensor = torch.stack(prey_windows, dim=0)

    return pred_tensor, prey_tensor

In [9]:
pred_tensors, prey_tensors = get_expert_tensors(full_track_windows, valid_windows, width, height, max_speed, window_size=num_frames)

print("Pred Tensors Shape:", pred_tensors.shape)
print("Prey Tensors Shape:", prey_tensors.shape)

Pred Tensors Shape: torch.Size([742, 1, 1, 32, 6])
Prey Tensors Shape: torch.Size([742, 1, 32, 32, 6])


In [10]:
# Dein Tensor: pred_tensors shape (742,1,1,32,6)
data = pred_tensors.squeeze()  # -> shape (742,32,6)

# Features und Target extrahieren
X = data[..., :4]  # dx,dy,vx,vy (742,32,4)
y = data[..., 4]   # v (742,32)

# In 2D-Shape umformen (Samples, Features)
X = X.reshape(-1, 4)  # (742*32, 4)
y = y.reshape(-1)     # (742*32,)

print("X shape:", X.shape)
print("y shape:", y.shape)

X shape: torch.Size([23744, 4])
y shape: torch.Size([23744])


In [11]:
import torch
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error

X_np = X.numpy()
y_np = y.numpy()

# Modell erstellen und fitten
reg = LinearRegression()
reg.fit(X_np, y_np)

# Vorhersagen
y_pred = reg.predict(X_np)

# Auswertung
print("Koeffizienten:", reg.coef_)
print("Intercept:", reg.intercept_)
print("R² Score:", r2_score(y_np, y_pred))
print("MSE:", mean_squared_error(y_np, y_pred))

Koeffizienten: [  0.0076661    0.028263    -0.10229    0.010941]
Intercept: 0.06784917995377002
R² Score: 0.038920663912849984
MSE: 0.0031988220544649022


In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import r2_score, mean_squared_error

class VelocityMLP(nn.Module):
    def __init__(self, input_dim=4, hidden_dim=32):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, x):
        return self.net(x)


# Stelle sicher, dass X und y Float32 sind
X = X.float()
y = y.float()

# Rest deines Codes bleibt gleich
model = VelocityMLP(input_dim=4, hidden_dim=32)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

epochs = 200
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()

    y_pred = model(X)
    loss = criterion(y_pred, y)
    loss.backward()
    optimizer.step()

    if epoch % 20 == 0:
        print(f"Epoch {epoch}/{epochs} - Loss: {loss.item():.6f}")

model.eval()
with torch.no_grad():
    y_pred = model(X)

r2 = r2_score(y.numpy(), y_pred.numpy())
mse = mean_squared_error(y.numpy(), y_pred.numpy())

print(f"R² Score: {r2:.4f}")
print(f"MSE: {mse:.6f}")


  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 0/200 - Loss: 0.006297
Epoch 20/200 - Loss: 0.003539
Epoch 40/200 - Loss: 0.003354
Epoch 60/200 - Loss: 0.003343
Epoch 80/200 - Loss: 0.003339
Epoch 100/200 - Loss: 0.003337
Epoch 120/200 - Loss: 0.003335
Epoch 140/200 - Loss: 0.003333
Epoch 160/200 - Loss: 0.003332
Epoch 180/200 - Loss: 0.003332
R² Score: 0.0009
MSE: 0.003325
