In [3]:
# Install required libraries if necessary
# !pip install torch torchvision pillow

# Import libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import os
from PIL import Image
from enum import Enum


In [4]:
# Define the enum for prayer poses
class PrayerPose(Enum):
    QIYAM = "Qiyam"
    RUKU = "Ruku"
    SUJUD = "Sujud"
    JALSA = "Jalsa"
    TASHAHHUD = "Tashahhud"
    UNKNOWN = "Unknown"


In [5]:
# Define the custom dataset class
class NamazPoseDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = [pose.value for pose in PrayerPose if pose != PrayerPose.UNKNOWN]
        self.image_paths = []
        self.labels = []

        # Load dataset
        for class_idx, class_name in enumerate(self.classes):
            class_dir = os.path.join(root_dir, class_name)
            if not os.path.exists(class_dir):
                print(f"Warning: Directory not found for {class_name}")
                continue

            print(f"Loading images for {class_name}...")
            for img_name in os.listdir(class_dir):
                if img_name.endswith(('.jpg', '.jpeg', '.png')):
                    self.image_paths.append(os.path.join(class_dir, img_name))
                    self.labels.append(class_idx)

            print(f"Found {len([x for x in self.labels if x == class_idx])} images for {class_name}")

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label


In [6]:
# Define the CNN model for prayer pose classification
class NamazPoseCNN(nn.Module):
    def __init__(self, num_classes):
        super(NamazPoseCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(512 * 14 * 14, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x


In [10]:
# Replace this with your dataset directory path
dataset_directory = r"C:\Users\mohum\OneDrive\Desktop\ML\salah_detection\dataset_directory"  # Update path for Colab

# Set up data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Create dataset and dataloader
dataset = NamazPoseDataset(dataset_directory, transform=transform)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

# Check if dataset is loaded
if len(dataset) == 0:
    print("No images found in the dataset! Check your data directory structure.")
else:
    print(f"Dataset loaded successfully with {len(dataset)} images.")


Loading images for Qiyam...
Found 307 images for Qiyam
Loading images for Ruku...
Found 226 images for Ruku
Loading images for Sujud...
Found 191 images for Sujud
Loading images for Jalsa...
Found 57 images for Jalsa
Loading images for Tashahhud...
Found 172 images for Tashahhud
Dataset loaded successfully with 953 images.


In [7]:
torch.cuda.empty_cache()


In [9]:
import torch

if torch.cuda.is_available():
    free_memory = torch.cuda.mem_get_info(0)[0]
    print("Free memory:", free_memory / (1024 ** 3), "GB")

Free memory: 1.652783203870058 GB


In [None]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torch.cuda.amp import GradScaler

# Set up the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if device.type == 'cuda':
    print(f"Using GPU: {torch.cuda.get_device_name(0)}")
else:
    print("Using CPU. Ensure GPU is enabled in Colab settings.")

# Initialize the model
num_classes = len([pose for pose in PrayerPose if pose != PrayerPose.UNKNOWN])
model = NamazPoseCNN(num_classes).to(device)

# Define the optimizer and loss criterion
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Set up GradScaler for mixed precision training
scaler = GradScaler()

# Optimized DataLoader
batch_size = 2  # Adjust based on GPU memory capacity
num_workers = 4  # Use multi-threaded data loading
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, persistent_workers=True)

# Training loop
num_epochs = 25
print("Starting training...")

param_size = 0
for param in model.parameters():
    param_size += param.nelement() * param.element_size()
buffer_size = 0
for buffer in model.buffers():
    buffer_size += buffer.nelement() * buffer.element_size()

size_all_mb = (param_size + buffer_size) / 1024**2
print('model size: {:.3f}MB'.format(size_all_mb))

# import sys; sys.exit()


model.train()
for epoch in range(num_epochs):
    epoch_start_time = torch.cuda.Event(enable_timing=True)
    epoch_end_time = torch.cuda.Event(enable_timing=True)
    epoch_start_time.record()

    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (inputs, labels) in enumerate(dataloader):
        batch_start_time = torch.cuda.Event(enable_timing=True)
        batch_end_time = torch.cuda.Event(enable_timing=True)
        batch_start_time.record()

        try:
            # Move inputs and labels to the device
            inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)

            optimizer.zero_grad()

            # Forward and backward passes
            with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            # Update metrics
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        except Exception as e:
            print(f"Error in batch {batch_idx+1}: {e}")
            continue

        batch_end_time.record()
        torch.cuda.synchronize()
        print(f"Batch {batch_idx+1} processed in {batch_start_time.elapsed_time(batch_end_time):.2f} ms")

    # Calculate and print epoch metrics
    epoch_loss = running_loss / len(dataloader)
    accuracy = 100. * correct / total
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {accuracy:.2f}%')

    # Save checkpoints less frequently
    if (epoch + 1) % 5 == 0:
        checkpoint = {
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': epoch_loss,
        }
        torch.save(checkpoint, f'model_checkpoint_epoch_{epoch+1}.pth')
        print(f"Checkpoint saved after epoch {epoch+1}")

    epoch_end_time.record()
    torch.cuda.synchronize()
    print(f"Epoch {epoch+1} completed in {epoch_start_time.elapsed_time(epoch_end_time) / 1000:.2f} seconds")

print("Training completed!")


In [None]:
# Initialize the model
model = NamazPoseCNN(num_classes).to(device)

# Load the best checkpoint
checkpoint_path = '/content/model_checkpoint_epoch_25.pth'  # Path to your saved model
# Load the checkpoint dictionary
checkpoint = torch.load(checkpoint_path)
# Extract the model's state_dict from the checkpoint
model_state_dict = checkpoint['model_state_dict']
# Load the state_dict into the model
model.load_state_dict(model_state_dict)
model.eval()  # Set model to evaluation mode

  checkpoint = torch.load(checkpoint_path)


NamazPoseCNN(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (10): ReLU(inplace=True)
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (classifier): Sequential(
    (0): Dropout(p=0.5, inplace=False)
    (1): Linear(in_features=100352, out_features=4096, bias=True)
    (2): ReLU(inplace=True)
    (3): Dropout(p=0.5, inplace=False)
  

In [None]:
pip install opencv-python ipywidgets


Collecting jedi>=0.16 (from ipython>=4.0.0->ipywidgets)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m17.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: jedi
Successfully installed jedi-0.19.2


In [None]:
import cv2

cap = cv2.VideoCapture(0)  # Open webcam (use 0 for default camera)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Convert frame to PIL image and preprocess
    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    input_tensor = transform(image).unsqueeze(0).to(device)

    # Make prediction
    with torch.no_grad():
        output = model(input_tensor)
        predicted_class = output.argmax(dim=1).item()

    # Display the prediction
    cv2.putText(frame, f"Pose: {class_names[predicted_class]}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

    cv2.imshow("Live Pose Detection", frame)

    # Press 'q' to quit
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


In [None]:
!du /content/model_checkpoint_epoch_25.pth

4884368	/content/model_checkpoint_epoch_25.pth


In [None]:
!cp /content/model_checkpoint_epoch_25.pth /content/drive/MyDrive/model_checkpoint_epoch_25.pth