# Setting up sapiens

# 1| what is HAR
* Human activity recognition, or HAR for short, is a broad field of study concerned with identifying the specific movement or action of a person based on sensor data.
* Movements are often typical activities performed indoors, such as walking, talking, standing, and sitting


# Why it is important ?
* Human activity recognition plays a significant role in human-to-human interaction and interpersonal relations.
* Because it provides information about the identity of a person, their personality, and psychological state, it is difficult to extract.
* The human ability to recognize another person’s activities is one of the main subjects of study of the scientific areas of computer vision and machine learning. As a result of this research, many applications, including video surveillance systems, human-computer interaction, and robotics for human behavior characterization, require a multiple activity recognition system.

# dataset
12k training images


# Best CNN Model  

## Model Architecture  
| Layer | Type | Output Shape | Parameters |
|--------|--------------|----------------|-------------|
| **EfficientNetB7** | Functional | (None, 2560) | 64,097,687 |
| **Flatten** | Flatten | (None, 2560) | 0 |
| **Dense** | Fully Connected | (None, 512) | 1,311,232 |
| **Dense_1** | Fully Connected | (None, 15) | 7,695 |

---

## **Total Parameters:** 65,416,614 (≈ 249.54 MB)  
- **Trainable Parameters:** 1,318,927 (≈ 5.03 MB)  
- **Non-trainable Parameters:** 64,097,687 (≈ 244.51 MB)  



# 2| Importing libraries

In [None]:
import os
import glob
import random
import numpy as np
import pandas as pd

from tqdm import tqdm

from PIL import Image

import seaborn as sns
import matplotlib.image as img
import matplotlib.pyplot as plt
import torch
torch.manual_seed(0)

# 3| Getting the path and Loading the data

In [None]:
train_data = pd.read_csv("../input/human-action-recognition-har-dataset/Human Action Recognition/Training_set.csv")
test_data = pd.read_csv("../input/human-action-recognition-har-dataset/Human Action Recognition/Testing_set.csv")

In [None]:
test_data.head()

In [None]:
labels = train_data.label.unique()
label2idx = {label: idx for idx, label in enumerate(labels)}
idx2label = {idx: label for idx, label in enumerate(labels)}
label2idx

### skip test because it does not have ground truth values

In [None]:
train_data.label.value_counts()

In [None]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from typing import List, Tuple

# -----------------------------
# Preprocessing functions
# -----------------------------
def create_preprocessor(input_size: Tuple[int, int],
                        mean: List[float] = (0.485, 0.456, 0.406),
                        std: List[float] = (0.229, 0.224, 0.225)):
    """
    Basic preprocessing: Resize, convert to tensor, and normalize.
    """
    return transforms.Compose([
        transforms.Resize(input_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=mean, std=std),
        # Note: The following Lambda unsqueeze is optional.
        # DataLoaders already add the batch dimension, so it might not be needed.
        # transforms.Lambda(lambda x: x.unsqueeze(0))
    ])

def create_train_augmentations(input_size: Tuple[int, int],
                               mean: List[float] = (0.485, 0.456, 0.406),
                               std: List[float] = (0.229, 0.224, 0.225)):
    """
    Preprocessing for training that includes augmentations:
      - RandomResizedCrop: scales and crops the image randomly.
      - RandomHorizontalFlip: randomly flips the image.
      - ColorJitter: applies random photometric distortions.
      - Finally converts to tensor and normalizes.
    """
    return transforms.Compose([
        transforms.RandomResizedCrop(input_size, scale=(0.8, 1.0)),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.ToTensor(),
        transforms.Normalize(mean=mean, std=std)
    ])

# -----------------------------
# Custom Dataset Class
# -----------------------------
class ActionDataset(Dataset):
    def __init__(self, df: pd.DataFrame, root_dir: str, transform=None, label2idx=label2idx):
        """
        Args:
            df (pd.DataFrame): DataFrame with columns "filename" and "label".
            root_dir (str): Directory where the images are stored.
            transform: Transformations to apply to each image.
        """
        self.df = df.reset_index(drop=True)
        self.root_dir = root_dir
        self.transform = transform

        self.label2idx = label2idx

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        # Get the row from the DataFrame.
        row = self.df.iloc[idx]
        img_path = os.path.join(self.root_dir, row['filename'])

        # Open the image and ensure it is in RGB format.
        image = Image.open(img_path).convert('RGB')

        # Convert the textual label into an integer.
        label = self.label2idx[row['label']]

        if self.transform:
            image = self.transform(image)
        return image, label

# -----------------------------
# Main setup: Split DataFrames and create DataLoaders
# -----------------------------

# Split train_df into training and validation sets (80% train, 20% val)
train_split_df, val_split_df = train_test_split(
    train_data, 
    test_size=0.2, 
    stratify=train_data['label'], 
    random_state=42
)

# Define the input size expected by your model (e.g., Sapiens expects 1024x1024)
input_size = (1024, 1024)

# Create transformation pipelines.
train_transform = create_train_augmentations(input_size)
val_transform = create_preprocessor(input_size)

# Set the directory where your images are stored.
train_data_root_dir = "/kaggle/input/human-action-recognition-har-dataset/Human Action Recognition/train"  # Replace with your actual image directory.
test_data_root_dir = "/kaggle/input/human-action-recognition-har-dataset/Human Action Recognition/test"
# Create datasets.
train_dataset = ActionDataset(train_split_df, train_data_root_dir, transform=train_transform)
val_dataset = ActionDataset(val_split_df, train_data_root_dir, transform=val_transform)
# test_dataset = ActionDataset(test_data, test_data_root_dir, transform=val_transform)



# finetuning SAPIENS


In [None]:
# # !wget https://huggingface.co/facebook/sapiens-pretrain-0.3b/resolve/main/sapiens_0.3b_epoch_1600_clean.pth
!wget https://huggingface.co/facebook/sapiens-pretrain-0.3b-torchscript/resolve/main/sapiens_0.3b_epoch_1600_torchscript.pt2

!wget -nc https://learnopencv.com/wp-content/uploads/2024/09/man-horse-arrow-scaled.jpg -O man-horse-arrow.jpg


In [None]:
from torch import nn
class ImageActionClassifier(nn.Module):
    def __init__(self, sapiens_model, num_classes):
        super(ImageActionClassifier, self).__init__()
        self.sapiens_model = sapiens_model
        for param in self.sapiens_model.parameters():
            param.requires_grad = False
    
        self.classifier_head = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(1024, num_classes)
        )


    def forward(self, x):
        with torch.no_grad():
            x = self.sapiens_model(x)
        return self.classifier_head(x[0])


In [None]:
sapiens_model_path = "sapiens_0.3b_epoch_1600_torchscript.pt2"
sapiens_model = torch.jit.load(sapiens_model_path)
sapiens_model.eval()


model = ImageActionClassifier(sapiens_model, 15)
# model(torch.rand(size=(1,3,1024, 1024)))

criterion = nn.CrossEntropyLoss()
optim = torch.optim.AdamW(model.parameters(), lr = 1e-3)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs with DataParallel.")
    model = torch.nn.DataParallel(model)
model = model.to(device)
print("device ", device)


In [None]:
num_parameters = sum(p.numel() for p in model.parameters() )
print(f"Number of parameters: {num_parameters}")
num_parameters = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Number of trainable parameters: {num_parameters}")

In [None]:
import wandb

api_key = "330a4a0723c3988c8d367cbb822d3d6624621fbd"
wandb.login(key=api_key)


In [None]:
batch_size = 96
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
# test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

In [None]:
import time
import wandb
from pathlib import Path

name = "sapiens-backbone-image-action-epoch4-lre-3run2"
wandb.init(project="HAR", name=f"{name}")
Path("./checkpoints").mkdir(parents=True, exist_ok=True)

epochs = 4
best_val_loss = float('inf')
start = time.time()
global_step = 0  # Add a global step counter

for epoch in range(epochs):
    model.train()
    train_loss = 0
    train_correct = 0
    train_total = 0
    
    # Training loop
    for batch_idx, batch in enumerate(train_loader):
        pixel_values = batch[0].to(device)
        labels = batch[1].to(device)
        
        # Forward pass
        outputs = model(pixel_values)
        loss = criterion(outputs, labels)
        
        # Backward pass
        optim.zero_grad()
        loss.backward()
        
        # Gradient norm calculation
        total_norm = 0
        for param in model.parameters():
            if param.grad is not None:
                param_norm = param.grad.data.norm(2)
                total_norm += param_norm.item() ** 2
        total_norm = total_norm ** 0.5
        
        optim.step()
        
        # Compute batch accuracy
        preds = torch.argmax(outputs, dim=1)
        batch_correct = (preds == labels).sum().item()
        batch_total = labels.size(0)
        batch_accuracy = batch_correct / batch_total
        
        # Log every step
        wandb.log({
            "Step": global_step,
            "Step Train Loss": loss.item(),
            "Step Train Accuracy": batch_accuracy,
            "Step Gradient Norm": total_norm,
            "Learning Rate": optim.param_groups[0]['lr'],
            "GPU Memory": torch.cuda.memory_allocated(device) / (1024 * 1024)
        })
        
        # Accumulate metrics for epoch-level logging
        train_loss += loss.item() * batch_total
        train_correct += batch_correct
        train_total += batch_total
        
        global_step += 1
        
        if batch_idx % 100 == 0:  # Print every 100 steps
            print(f"Epoch {epoch+1}, Step {batch_idx}, "
                  f"Step Loss: {loss.item():.4f}, "
                  f"Step Accuracy: {batch_accuracy*100:.2f}%")
        
    
    # Calculate epoch-level metrics
    epoch_train_loss = train_loss / train_total
    epoch_train_accuracy = train_correct / train_total
    
    # Validation step
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    
    with torch.no_grad():
        for batch in val_loader:
            pixel_values = batch[0].to(device)
            labels = batch[1].to(device)
            
            outputs = model(pixel_values)
            batch_loss = criterion(outputs, labels).item()
            
            preds = torch.argmax(outputs, dim=1)
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)
            val_loss += batch_loss * labels.size(0)
    
    # Calculate and log epoch-level validation metrics
    epoch_val_loss = val_loss / val_total
    epoch_val_accuracy = val_correct / val_total
    
    # Log epoch-level metrics
    wandb.log({
        "Epoch": epoch,
        "Epoch Train Loss": epoch_train_loss,
        "Epoch Train Accuracy": epoch_train_accuracy,
        "Epoch Validation Loss": epoch_val_loss,
        "Epoch Validation Accuracy": epoch_val_accuracy,
    })
    
    # Save best model
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        save_path = "./checkpoints"
        torch.save(
            model.module.state_dict() if isinstance(model, torch.nn.DataParallel) else model.state_dict(), 
            f"{save_path}/model.pt"
        )
        print(f"Saved checkpoint at {save_path}")
    
    print(f"Epoch {epoch+1}/{epochs}, "
          f"Train Loss: {epoch_train_loss:.4f}, "
          f"Train Accuracy: {epoch_train_accuracy*100:.2f}%, "
          f"Val Loss: {epoch_val_loss:.4f}, "
          f"Val Accuracy: {epoch_val_accuracy*100:.2f}%")
    

wandb.finish()
df = pd.DataFrame({"time taken": (time.time() - start)/60, "epochs": epochs}, index=[0])
df.to_csv("./time.csv")

In [None]:
# import torch
# import pandas as pd
# from collections import defaultdict

# # Ensure the model is in evaluation mode
# model.eval()

# all_preds = []
# all_labels = []

# # Evaluate the model on the test set.
# with torch.no_grad():
#     for batch in test_loader:
#         pixel_values = batch[0].to(device)
#         labels = batch[1].to(device)

#         outputs = model(pixel_values)
#         preds = torch.argmax(outputs, dim=1)

#         all_preds.append(preds.cpu())
#         all_labels.append(labels.cpu())
#         break

# # Concatenate all predictions and labels from the test set
# all_preds = torch.cat(all_preds)
# all_labels = torch.cat(all_labels)

# # Compute total accuracy.
# total_accuracy = (all_preds == all_labels).float().mean().item()

# # Determine the number of classes.
# # Here, we assume your test dataset (or training dataset) has an attribute 'labels'
# # that is a sorted list of class names.
# num_classes = len(test_dataset.labels)  # Adjust if your dataset stores this differently

# # Initialize counters for per-class accuracy.
# class_correct = defaultdict(int)
# class_total = defaultdict(int)

# # Compute per-class correct predictions.
# for true_label, pred in zip(all_labels, all_preds):
#     true_label = true_label.item()
#     pred = pred.item()
#     class_total[true_label] += 1
#     if true_label == pred:
#         class_correct[true_label] += 1

# # Prepare data for CSV output.
# results = []
# for cls in range(num_classes):
#     # Get the class name from the dataset; if you don't have names, you can simply use the class index.
#     class_name = test_dataset.labels[cls] if hasattr(test_dataset, 'labels') else str(cls)
#     total = class_total[cls]
#     accuracy = class_correct[cls] / total if total > 0 else 0
#     results.append({
#         'Class': class_name,
#         'Total Samples': total,
#         'Accuracy': accuracy
#     })

# # Add an overall accuracy row.
# results.append({
#     'Class': 'Overall',
#     'Total Samples': len(all_labels),
#     'Accuracy': total_accuracy
# })

# # Create a DataFrame and save it to CSV.
# df_results = pd.DataFrame(results)
# csv_output_path = 'test_accuracy.csv'
# df_results.to_csv(csv_output_path, index=False)

# print("Test results saved to", csv_output_path)


# 4| Making function that take random path and display the image

In [None]:
# def displaying_random_images():
#     num = random.randint(1,10000)
#     imgg = "Image_{}.jpg".format(num)
#     train = "../input/human-action-recognition-har-dataset/Human Action Recognition/train/"
#     if os.path.exists(train+imgg):
#         testImage = img.imread(train+imgg)
#         plt.imshow(testImage)
#         plt.title("{}".format(train_data.loc[train_data['filename'] == "{}".format(imgg), 'label'].item()))

#     else:
#         #print(train+img)
#         print("File Path not found \nSkipping the file!!")

In [None]:
# displaying_random_images()

In [None]:
# displaying_random_images()

In [None]:
# displaying_random_images()

In [None]:
# displaying_random_images()

# 5| Data preprocessing

In [None]:
# img_data = []
# img_label = []
# length = len(train_fol)
# for i in (range(len(train_fol)-1)):
#     t = '../input/human-action-recognition-har-dataset/Human Action Recognition/train/' + filename[i]    
#     temp_img = Image.open(t)
#     img_data.append(np.asarray(temp_img.resize((160,160))))
#     img_label.append(situation[i])

In [None]:
# img_shape= (160,160,3)

In [None]:
# iii = img_data
# iii = np.asarray(iii)
# type(iii)

In [None]:
# y_train = to_categorical(np.asarray(train_data["label"].factorize()[0]))
# print(y_train[0])

# 6| Make an CNN model

In [None]:
# efficientnet_model = Sequential()

# model = tf.keras.applications.EfficientNetB7(include_top=False,
#                                             input_shape=(160,160,3),
#                                             pooling ="avg",classes=15,
#                                              weights="imagenet")

# for layer in model.layers:
#     layer.trainable=False
    

# efficientnet_model.add(model)
# efficientnet_model.add(Flatten())
# efficientnet_model.add(Dense(512,activation="relu"))
# efficientnet_model.add(Dense(15,activation="softmax"))

In [None]:
# efficientnet_model.compile(optimizer="adam",loss="categorical_crossentropy",metrics=["accuracy"])

In [None]:
# efficientnet_model.summary()

In [None]:
# history = efficientnet_model.fit(iii,y_train,epochs=40)

In [None]:
# losses = history.history["loss"]
# plt.plot(losses)

In [None]:
# acc = history.history['accuracy']
# plt.plot(acc)

# 7| Model predictions

In [None]:
# def read_img(fn):
#     img = Image.open(fn)
#     return np.asarray(img.resize((160,160)))

In [None]:
# def test_predict(test_image):
#     result = efficientnet_model.predict(np.asarray([read_img(test_image)]))

#     itemindex = np.where(result==np.max(result))
#     prediction = itemindex[1][0]
#     print("probability: "+str(np.max(result)*100) + "%\nPredicted class : ", prediction)

#     image = img.imread(test_image)
#     plt.imshow(image)
#     plt.title(prediction)

In [None]:
# test_predict("/kaggle/input/human-action-recognition-har-dataset/Human Action Recognition/test/Image_1001.jpg")

In [None]:
# test_predict("/kaggle/input/human-action-recognition-har-dataset/Human Action Recognition/test/Image_101.jpg")

In [None]:
# test_predict("/kaggle/input/human-action-recognition-har-dataset/Human Action Recognition/test/Image_1056.jpg")

In [None]:
# test_predict("/kaggle/input/human-action-recognition-har-dataset/Human Action Recognition/test/Image_1024.jpg")