# Fall Detection using ResNet50 with Data Augmentation and F1 Thresholding

In [None]:
import os
import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import Compose, Resize, ToTensor, Normalize, RandomHorizontalFlip, RandomRotation, ColorJitter
from torchvision import models
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from PIL import Image

In [None]:
# ---------------- STEP 1: Configuration ---------------- #
TRAIN_DIR = "/kaggle/input/dataslayer2/train"
TEST_DIR = "/kaggle/input/dataslayer2/test"
SUBMISSION_FILE = "submission.csv"
IMG_SIZE = (224, 224)
BATCH_SIZE = 16
EPOCHS = 20
LEARNING_RATE = 2e-5
RANDOM_STATE = 42

In [None]:
# ---------------- STEP 2: Data Preparation ---------------- #
def preprocess_dataset(directory):
    images, labels = [], []
    for subject in os.listdir(directory):
        subject_path = os.path.join(directory, subject)
        if not os.path.isdir(subject_path):
            continue
        for category in ["fall", "non_fall"]:
            category_path = os.path.join(subject_path, category)
            label = 1 if category == "fall" else 0
            if os.path.isdir(category_path):
                for root, _, files in os.walk(category_path):
                    for file in files:
                        if file.endswith((".jpg", ".png", ".jpeg")):
                            img_path = os.path.join(root, file)
                            images.append(img_path)
                            labels.append(label)
    return images, labels

X, y = preprocess_dataset(TRAIN_DIR)
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, stratify=y, random_state=RANDOM_STATE)
print(f"Training samples: {len(X_train)}, Validation samples: {len(X_val)}")

In [None]:
# ---------------- STEP 3: Dataset Definition ---------------- #
train_transform = Compose([
    Resize(IMG_SIZE),
    RandomHorizontalFlip(),
    RandomRotation(10),
    ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    ToTensor(),
    Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
])

val_transform = Compose([
    Resize(IMG_SIZE),
    ToTensor(),
    Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
])

class ImageDataset(Dataset):
    def __init__(self, image_paths, labels=None, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert("RGB")
        image = self.transform(image) if self.transform else image
        data = {"image": image}
        if self.labels is not None:
            data["label"] = torch.tensor(self.labels[idx], dtype=torch.long)
        return data

train_dataset = ImageDataset(X_train, y_train, transform=train_transform)
val_dataset = ImageDataset(X_val, y_val, transform=val_transform)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)


In [None]:
# ---------------- STEP 4: Model Definition ---------------- #
class CNN(nn.Module):
    def __init__(self, num_classes=2):
        super(CNN, self).__init__()
        self.model = models.resnet50(pretrained=True)
        self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)

    def forward(self, x):
        return self.model(x)

model = CNN(num_classes=2).to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)


In [None]:
# ---------------- STEP 5: Training Loop ---------------- #
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs, device):
    train_losses, val_losses, val_f1_scores = [], [], []
    best_f1 = 0
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for batch in tqdm(train_loader):
            images = batch["image"].to(device)
            labels = batch["label"].to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        train_loss = running_loss / len(train_loader)
        train_losses.append(train_loss)

        # Validation phase
        model.eval()
        val_loss, val_preds, val_labels = 0.0, [], []
        with torch.no_grad():
            for batch in val_loader:
                images = batch["image"].to(device)
                labels = batch["label"].to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                val_preds.extend(torch.argmax(outputs, dim=1).cpu().numpy())
                val_labels.extend(labels.cpu().numpy())

        val_loss /= len(val_loader)
        val_losses.append(val_loss)
        val_f1 = f1_score(val_labels, val_preds, average="weighted")
        val_f1_scores.append(val_f1)

        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val F1: {val_f1:.4f}")

        if val_f1 > best_f1:
            best_f1 = val_f1
            torch.save(model.state_dict(), "best_model.pth")
            print(f"Saved Best Model with F1: {best_f1:.4f}")

    return train_losses, val_losses, val_f1_scores

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_losses, val_losses, val_f1_scores = train_model(model, train_loader, val_loader, criterion, optimizer, EPOCHS, device)

In [None]:
# ---------------- STEP 6: Learning Curve ---------------- #
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label="Training Loss")
plt.plot(val_losses, label="Validation Loss")
plt.title("Learning Curve")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()


In [None]:
# ---------------- STEP 7: Validation Metrics ---------------- #
best_model = CNN(num_classes=2)
best_model.load_state_dict(torch.load("best_model.pth"))
best_model = best_model.to(device)
best_model.eval()

val_preds, val_labels, val_probs = [], [], []
with torch.no_grad():
    for batch in val_loader:
        images = batch["image"].to(device)
        labels = batch["label"].to(device)
        outputs = best_model(images)
        probs = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()
        val_preds.extend(torch.argmax(outputs, dim=1).cpu().numpy())
        val_probs.extend(probs)
        val_labels.extend(labels.cpu().numpy())

print("\nClassification Report:")
print(classification_report(val_labels, val_preds, target_names=["non_fall", "fall"]))

conf_matrix = confusion_matrix(val_labels, val_preds)
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=["non_fall", "fall"], yticklabels=["non_fall", "fall"])
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

In [None]:
# ---------------- STEP 8: Test Predictions with Threshold Optimization ---------------- #
class ImageDataset(Dataset):
    def __init__(self, image_paths, labels=None, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        try:
            image = Image.open(self.image_paths[idx]).convert("RGB")
        except Exception as e:
            print(f"Error loading image: {self.image_paths[idx]}, {e}")
            image = Image.new("RGB", IMG_SIZE)  # Use a blank image as a fallback

        image = self.transform(image) if self.transform else image
        data = {"image": image, "path": self.image_paths[idx]}  # Store path for later use
        if self.labels is not None:
            data["label"] = torch.tensor(self.labels[idx], dtype=torch.long)
        return data

test_images = [os.path.join(TEST_DIR, img) for img in os.listdir(TEST_DIR) if img.endswith((".jpg", ".png", ".jpeg"))]
test_dataset = ImageDataset(test_images, transform=val_transform)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

test_probs, test_ids = [], []
with torch.no_grad():
    for batch in test_loader:
        images = batch["image"].to(device)
        outputs = best_model(images)
        probs = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()  # Probability for 'fall'
        test_probs.extend(probs)
        test_ids.extend([os.path.basename(path) for path in batch["path"]])  # Access paths from dataset

# Find optimal threshold using validation probabilities
if len(val_probs) > 0:  # Ensure val_probs is not empty
    best_threshold, best_f1 = 0, 0
    thresholds = np.linspace(0.1, 0.9, 50)
    for threshold in thresholds:
        val_preds_threshold = (np.array(val_probs) > threshold).astype(int)
        f1 = f1_score(val_labels, val_preds_threshold)
        if f1 > best_f1:
            best_f1, best_threshold = f1, threshold

    print(f"Best Threshold: {best_threshold:.2f}, Best F1 Score: {best_f1:.4f}")

    # Apply best threshold to test data
    test_preds = (np.array(test_probs) > best_threshold).astype(int)

    submission = pd.DataFrame({"id": test_ids, "label": test_preds})
    submission.to_csv(SUBMISSION_FILE, index=False)
    print(f"Submission saved with threshold {best_threshold:.2f}")
else:
    print("Validation probabilities are empty. Skipping threshold optimization.")