In [1]:
import os
import cv2
import torch
from torch.utils.data import Dataset
import numpy as np
from sklearn.preprocessing import LabelEncoder

class VideoDataset(Dataset):
    def __init__(self, data_dir, img_size=(128, 128), frames_per_clip=16, transform=None):
        self.img_size = img_size
        self.frames_per_clip = frames_per_clip
        self.transform = transform
        self.samples = []
        self.labels = []
        self.label_encoder = LabelEncoder()

        # Scan dataset
        all_labels = []
        for label in os.listdir(data_dir):
            label_path = os.path.join(data_dir, label)
            if not os.path.isdir(label_path):
                continue
            frames = sorted(os.listdir(label_path))
            if len(frames) < frames_per_clip:
                continue  # skip too short videos
            # save sequence of frames as one sample
            self.samples.append([os.path.join(label_path, f) for f in frames])
            all_labels.append(label)

        self.labels = self.label_encoder.fit_transform(all_labels)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        frame_paths = self.samples[idx]
        label = self.labels[idx]

        # pick evenly spaced frames
        step = max(1, len(frame_paths) // self.frames_per_clip)
        selected = frame_paths[::step][:self.frames_per_clip]

        clip = []
        for f in selected:
            img = cv2.imread(f)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, self.img_size)
            img = img / 255.0
            clip.append(img)

        clip = np.array(clip)  # (T, H, W, C)
        clip = np.transpose(clip, (3, 0, 1, 2))  # (C, T, H, W) for 3D CNN
        return torch.tensor(clip, dtype=torch.float32), torch.tensor(label, dtype=torch.long)


In [None]:
import torch.nn as nn

class ActionRecognitionModel(nn.Module):
    def __init__(self, num_classes, embed_dim=256, num_heads=4, num_layers=2):
        super().__init__()

        # 3D CNN backbone
        self.cnn3d = nn.Sequential(
            nn.Conv3d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool3d((1,2,2)),

            nn.Conv3d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool3d((2,2,2)),

            nn.Conv3d(128, embed_dim, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool3d((None, 1, 1))  # keep T, collapse H,W
        )

        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # Classifier
        self.fc = nn.Linear(embed_dim, num_classes)

    def forward(self, x):
        # x: (B, C, T, H, W)
        feats = self.cnn3d(x)           # (B, E, T, 1, 1)
        feats = feats.squeeze(-1).squeeze(-1)  # (B, E, T)

        feats = feats.permute(2, 0, 1)  # (T, B, E) for transformer
        out = self.transformer(feats)   # (T, B, E)

        out = out.mean(0)               # (B, E) temporal average
        return self.fc(out)


In [None]:
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn.functional as F

def train_model(data_dir, epochs=10, batch_size=4, lr=1e-4):
    dataset = VideoDataset(data_dir)
    num_classes = len(np.unique(dataset.labels))

    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = ActionRecognitionModel(num_classes=num_classes).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        model.train()
        total_loss, correct, total = 0, 0, 0
        for clips, labels in loader:
            clips, labels = clips.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(clips)
            loss = F.cross_entropy(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            _, pred = outputs.max(1)
            correct += pred.eq(labels).sum().item()
            total += labels.size(0)

        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(loader):.4f}, Acc: {correct/total:.4f}")

    return model, dataset.label_encoder


In [None]:
def predict(model, encoder, video_dir, frames_per_clip=16, img_size=(128,128)):
    model.eval()
    frames = sorted(os.listdir(video_dir))
    step = max(1, len(frames) // frames_per_clip)
    selected = frames[::step][:frames_per_clip]

    clip = []
    for f in selected:
        img = cv2.imread(os.path.join(video_dir, f))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, img_size)
        img = img / 255.0
        clip.append(img)

    clip = np.array(clip)
    clip = np.transpose(clip, (3,0,1,2))
    clip = torch.tensor(clip, dtype=torch.float32).unsqueeze(0)  # (1,C,T,H,W)

    with torch.no_grad():
        outputs = model(clip.to(next(model.parameters()).device))
        pred = outputs.argmax(1).item()
    return encoder.inverse_transform([pred])[0]


In [3]:
import os
import cv2
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F


# -------------------- Dataset --------------------
class VideoDataset(Dataset):
    def __init__(self, data_dir, img_size=(128, 128), frames_per_clip=16):
        self.img_size = img_size
        self.frames_per_clip = frames_per_clip
        self.samples = []
        self.labels = []
        self.label_encoder = LabelEncoder()

        all_labels = []
        for label in os.listdir(data_dir):
            label_path = os.path.join(data_dir, label)
            if not os.path.isdir(label_path):
                continue
            frames = sorted(os.listdir(label_path))
            if len(frames) < frames_per_clip:
                continue
            self.samples.append([os.path.join(label_path, f) for f in frames])
            all_labels.append(label)

        self.labels = self.label_encoder.fit_transform(all_labels)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        frame_paths = self.samples[idx]
        label = self.labels[idx]

        step = max(1, len(frame_paths) // self.frames_per_clip)
        selected = frame_paths[::step][:self.frames_per_clip]

        clip = []
        for f in selected:
            img = cv2.imread(f)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, self.img_size)
            img = img / 255.0
            clip.append(img)

        clip = np.array(clip)  # (T, H, W, C)
        clip = np.transpose(clip, (3, 0, 1, 2))  # (C, T, H, W)
        return torch.tensor(clip, dtype=torch.float32), torch.tensor(label, dtype=torch.long)


# -------------------- Model --------------------
class ActionRecognitionModel(nn.Module):
    def __init__(self, num_classes, embed_dim=256, num_heads=4, num_layers=2):
        super().__init__()

        self.cnn3d = nn.Sequential(
            nn.Conv3d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool3d((1, 2, 2)),

            nn.Conv3d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool3d((2, 2, 2)),

            nn.Conv3d(128, embed_dim, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool3d((None, 1, 1))
        )

        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.fc = nn.Linear(embed_dim, num_classes)

    def forward(self, x):
        feats = self.cnn3d(x)           # (B, E, T, 1, 1)
        feats = feats.squeeze(-1).squeeze(-1)  # (B, E, T)
        feats = feats.permute(2, 0, 1)  # (T, B, E)
        out = self.transformer(feats)   # (T, B, E)
        out = out.mean(0)               # (B, E)
        return self.fc(out)


# -------------------- Train --------------------
def train_model(data_dir, epochs=5, batch_size=2, lr=1e-4):
    dataset = VideoDataset(data_dir)
    num_classes = len(np.unique(dataset.labels))
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = ActionRecognitionModel(num_classes=num_classes).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        model.train()
        total_loss, correct, total = 0, 0, 0
        for clips, labels in loader:
            clips, labels = clips.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(clips)
            loss = F.cross_entropy(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            _, pred = outputs.max(1)
            correct += pred.eq(labels).sum().item()
            total += labels.size(0)

        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(loader):.4f}, Acc: {correct/total:.4f}")

    return model, dataset.label_encoder


# -------------------- Inference --------------------
def predict(model, encoder, video_dir, frames_per_clip=16, img_size=(128, 128)):
    model.eval()
    frames = sorted(os.listdir(video_dir))
    step = max(1, len(frames) // frames_per_clip)
    selected = frames[::step][:frames_per_clip]

    clip = []
    for f in selected:
        img = cv2.imread(os.path.join(video_dir, f))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, img_size)
        img = img / 255.0
        clip.append(img)

    clip = np.array(clip)
    clip = np.transpose(clip, (3, 0, 1, 2))
    clip = torch.tensor(clip, dtype=torch.float32).unsqueeze(0)

    with torch.no_grad():
        outputs = model(clip.to(next(model.parameters()).device))
        pred = outputs.argmax(1).item()
    return encoder.inverse_transform([pred])[0]


# -------------------- Main --------------------
def main():
    data_dir = "data/dataset"  # put your dataset path here
    model, encoder = train_model(data_dir, epochs=5, batch_size=2)

    # Save model
    torch.save(model.state_dict(), "activity_model.pth")

    # Inference example (use any video folder with frames)
    test_video_dir = os.path.join(data_dir, "walking")  # change to your test video path
    prediction = predict(model, encoder, test_video_dir)
    print(f"Predicted activity: {prediction}")


if __name__ == "__main__":
    main()




Epoch 1/5, Loss: 2.9996, Acc: 0.0000
Epoch 2/5, Loss: 1.4855, Acc: 0.3333
Epoch 3/5, Loss: 1.5639, Acc: 0.3333
Epoch 4/5, Loss: 1.2378, Acc: 0.0000
Epoch 5/5, Loss: 1.1873, Acc: 0.3333


FileNotFoundError: [Errno 2] No such file or directory: 'data/dataset/walking'