In [84]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import lightning as L

from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from lightning.pytorch.callbacks import ModelCheckpoint
from lightning.pytorch.loggers.tensorboard import TensorBoardLogger
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
import torch
from facenet_pytorch import MTCNN,InceptionResnetV1
from facenet_pytorch.models.inception_resnet_v1 import BasicConv2d
from torchvision import models, transforms
import torch.nn as nn
import cv2
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
device = 'cuda' if torch.cuda.is_available() else 'cpu'


In [107]:
import os
import torch
import pandas as pd
import numpy as np
import PIL
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
import PIL.ImageOps
image_size=128
class FaceLandmarksDataset(Dataset):
    def __init__(self, root_dir, landmarks_extension, transform=None):
        self.root_dir: Path = Path(root_dir)
        self.transform = transform
        self.landmarks_extension = landmarks_extension
        
        data = self.set_data_paths(root_dir, ["*.jpg", "*.png"])
        self.df = self.make_csv(data)
        self.num_of_classes = self.df['id'].nunique()
        print(f"num of classes: {self.num_of_classes}")

    def set_data_paths(self, root_dir, extensions: list = list) -> dict:
        all_names = os.listdir(root_dir)
        data_paths = {}
        for name in all_names:
            new_path = self.root_dir.joinpath(name)
            paths = []
            for ext in extensions:
                paths += list(new_path.rglob(ext))
            data_paths[name] = paths
        return data_paths

    def make_csv(self, data: dict):
        records = []
        id_count={}
        for name, paths in data.items():
            for path in paths:
                if path.exists():
                    new_path=str(path)
                    id=int(new_path.split("\\")[1].split("_")[0])
                    if id not in id_count:
                        id_count[id]=id
                    records.append({
                        "image_path":new_path ,
                        "name": name,
                        "id": id
                    })

        df = pd.DataFrame(records)
        print(id_count)
        return df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_path = self.df.iloc[idx]["image_path"]
        id = self.df.iloc[idx]["id"]

        # Open the image
        image = PIL.Image.open(img_path)

        # Check if the image is grayscale, if so, convert it to RGB
        if image.mode != 'RGB':
            image = image.convert('RGB')

        # Get the dimensions of the image
        width, height = image.size

        # If the image dimensions are greater than 128x128, pad it
        if width > image_size or height > image_size:
            image = PIL.ImageOps.pad(image, (128, 128), color=(0, 0, 0))
            print(f"padded:{img_path}")

        # Convert the image to a NumPy array
        image = np.array(image)

        # Ensure that the image has been properly read
        assert image.size != 0, f"[ERROR] The image {img_path} is empty or cannot be read."

        # Create a sample dictionary
        sample = {'id': id, 'image': torch.tensor(image, dtype=torch.float32)}

        if self.transform:
            sample = self.transform(sample)

        return sample

# Initialize the dataset
face_dataset = FaceLandmarksDataset(
    root_dir='./dataset_not_for_upload_images',
    landmarks_extension="_landmarks.npy",
)

# Create the DataLoader
dataloader = DataLoader(face_dataset, batch_size=64, shuffle=True, num_workers=0)

# Print the dataset dataframe
face_dataset.df["id"]


{10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 17: 17, 18: 18, 19: 19, 1: 1, 20: 20, 21: 21, 22: 22, 23: 23, 24: 24, 25: 25, 26: 26, 27: 27, 28: 28, 29: 29, 2: 2, 30: 30, 31: 31, 32: 32, 33: 33, 34: 34, 35: 35, 36: 36, 37: 37, 38: 38, 39: 39, 3: 3, 40: 40, 41: 41, 42: 42, 43: 43, 44: 44, 45: 45, 46: 46, 47: 47, 48: 48, 49: 49, 4: 4, 50: 50, 51: 51, 52: 52, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9}
num of classes: 52


0       10
1       10
2       10
3       10
4       10
        ..
6714     9
6715     9
6716     9
6717     9
6718     9
Name: id, Length: 6719, dtype: int64

52

In [105]:

# Define the CNN architecture

class FaceClassification(L.LightningModule):
    def __init__(self,num_classes):
        super().__init__()
        self.gray_scale_input = nn.Conv2d(in_channels=1, out_channels=3, kernel_size=3, stride=1, padding=1)  
        inception =InceptionResnetV1( classify=True, num_classes=num_classes)
                # Change the input layer to accept grayscale images
        
       
        self.classifier = inception.to(device)
        
        

    def forward(self,x):
       
        if len(x.size)==3:
            x = x.unsqueeze(1)
            x=self.gray_scale_input(x)
       
        x=self.classifier(x)
       
        return x

    def training_step(self, batch, batch_idx):
        #print(f"batchtest:{batch}")
        
        map = batch
        x=map["image"]
        y=map["id"]
        y_hat = self(x)
        loss = F.cross_entropy(y_hat, y)
        acc = (y_hat.argmax(1) == y).float().mean()

        self.log("train_loss", loss)
        self.log("train_acc", acc)
        print(f"test_loss:{loss}")
        print(f"test_acc:{ acc}")
        return loss

    def validation_step(self, batch, batch_idx):
        
        map = batch
        x=map["image"]
        y=map["id"]
        y_hat = self(x)
        loss = F.cross_entropy(y_hat, y)
        acc = (y_hat.argmax(1) == y).float().mean()
        self.log("val_loss", loss)
        self.log("val_acc", acc)
        print(f"test_loss:{loss}")
        print(f"test_acc:{ acc}")

    def test_step(self, batch, batch_idx):
        map = batch
        x=map["image"]
        y=map["id"]
        y_hat = self(x)
        loss = F.cross_entropy(y_hat, y)
        acc = (y_hat.argmax(1) == y).float().mean()
        self.log("test_loss", loss)
        self.log("test_acc", acc)
        print(f"test_loss:{loss}")
        print(f"test_acc:{ acc}")

    def configure_optimizers(self):
        
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode="min", factor=0.1, patience=5
        )
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "monitor": "val_loss",
            },
        }

normalize=transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
# Data transformations
transform_train = transforms.Compose(
    [
  
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.5, contrast=0.5),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        #normalize,
    ]
)

transform_test = transforms.Compose(
    [
        transforms.ToTensor(),
    # normalize 
     
    ]
)



train_loader = DataLoader(face_dataset, batch_size=64, shuffle=True, num_workers=0)
val_loader = DataLoader(face_dataset, batch_size=64, shuffle=False, num_workers=0)

# Initialize the model
model = FaceClassification(face_dataset.num_of_classes)

# Define callbacks
checkpoint_callback = ModelCheckpoint(
    dirpath="checkpoints",
    monitor="val_loss",
    filename="face-recognition-{epoch:02d}-{val_loss:.2f}-{val_acc:.2f}",
    save_top_k=3,
    mode="min",
)

early_stopping = EarlyStopping(
    monitor="val_loss", patience=5, mode="min", verbose=False
)

# Initialize the logger
logger = TensorBoardLogger(save_dir="lightning_logs", name="face_classification")

# Initialize the Trainer
trainer = L.Trainer(
    max_epochs=10,
    callbacks=[checkpoint_callback, early_stopping],
    logger=logger,
    accelerator="gpu" if torch.cuda.is_available() else "cpu",
    devices="auto",
)



GPU available: False, used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


In [87]:
checkpoint_path=""
trainer = L.Trainer(
    max_epochs=10,
    callbacks=[checkpoint_callback, early_stopping],
    logger=logger,
    accelerator="gpu" if torch.cuda.is_available() else "cpu",
    devices="auto",
    resume_from_checkpoint=checkpoint_path  # Resume from checkpoint
)

TypeError: Trainer.__init__() got an unexpected keyword argument 'resume_from_checkpoint'

In [77]:
sample_input = torch.randn(5, 128, 128)  # Batch size of 1, 1 channel, 128x128 size
output = model(sample_input)
print(output.shape)  # Should output (1, num_classes)

torch.Size([5, 25])


In [91]:
# Train the model
trainer.fit(model, train_loader, val_loader)



d:\anaconda\envs\per\Lib\site-packages\lightning\pytorch\callbacks\model_checkpoint.py:654: Checkpoint directory D:\AI\MS-DS-Practice\cv\face_recognition\checkpoints exists and is not empty.

  | Name             | Type              | Params | Mode 
---------------------------------------------------------------
0 | gray_scale_input | Conv2d            | 30     | train
1 | classifier       | InceptionResnetV1 | 23.5 M | train
---------------------------------------------------------------
23.5 M    Trainable params
0         Non-trainable params
23.5 M    Total params
93.982    Total estimated model params size (MB)
551       Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

d:\anaconda\envs\per\Lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:424: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=7` in the `DataLoader` to improve performance.


Sanity Checking DataLoader 0:   0%|          | 0/2 [00:00<?, ?it/s]test_loss:3.2001161575317383
test_acc:0.0
Sanity Checking DataLoader 0:  50%|█████     | 1/2 [00:02<00:02,  0.44it/s]test_loss:3.1986441612243652
test_acc:0.0
                                                                           

d:\anaconda\envs\per\Lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:424: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=7` in the `DataLoader` to improve performance.
d:\anaconda\envs\per\Lib\site-packages\lightning\pytorch\loops\fit_loop.py:298: The number of training batches (34) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.


Epoch 0:   0%|          | 0/34 [00:00<?, ?it/s] test_loss:3.4437026977539062
test_acc:0.046875
Epoch 0:   3%|▎         | 1/34 [00:06<03:46,  0.15it/s, v_num=1]test_loss:2.7922780513763428
test_acc:0.25
Epoch 0:   6%|▌         | 2/34 [00:13<03:32,  0.15it/s, v_num=1]test_loss:2.8773763179779053
test_acc:0.15625
Epoch 0:   9%|▉         | 3/34 [00:19<03:21,  0.15it/s, v_num=1]test_loss:2.6082136631011963
test_acc:0.25
Epoch 0:  12%|█▏        | 4/34 [00:27<03:29,  0.14it/s, v_num=1]test_loss:2.197352170944214
test_acc:0.34375
Epoch 0:  15%|█▍        | 5/34 [00:36<03:31,  0.14it/s, v_num=1]test_loss:1.8622552156448364
test_acc:0.421875
Epoch 0:  18%|█▊        | 6/34 [00:45<03:34,  0.13it/s, v_num=1]test_loss:2.2014591693878174
test_acc:0.296875
Epoch 0:  21%|██        | 7/34 [00:53<03:27,  0.13it/s, v_num=1]test_loss:1.7775448560714722
test_acc:0.421875
Epoch 0:  24%|██▎       | 8/34 [01:00<03:17,  0.13it/s, v_num=1]test_loss:1.4139766693115234
test_acc:0.578125
Epoch 0:  26%|██▋       | 9/

`Trainer.fit` stopped: `max_epochs=10` reached.


Epoch 9: 100%|██████████| 34/34 [05:24<00:00,  0.10it/s, v_num=1]


In [82]:
# Test the model
trainer.test(model, val_loader)

d:\anaconda\envs\per\Lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:424: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=7` in the `DataLoader` to improve performance.


Testing DataLoader 0: 100%|██████████| 34/34 [01:12<00:00,  0.47it/s]
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
        test_acc            0.9935064911842346
        test_loss           0.01975168287754059
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_loss': 0.01975168287754059, 'test_acc': 0.9935064911842346}]

In [92]:
torch.save(model.state_dict(), './weights/face_classification_model_3.pth')

In [56]:
import torch
import torch.nn as nn
import pytorch_lightning as L

import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import transforms

# Define the CNN architecture
class FaceClassification(L.LightningModule):
    def __init__(self, num_classes):
        super().__init__()
        # Layer to convert grayscale to RGB
        self.gray_scale_input = nn.Conv2d(in_channels=1, out_channels=3, kernel_size=3, stride=1, padding=1)
        
        # Inception ResNet V1 model
        self.classifier = InceptionResnetV1( classify=True, num_classes=num_classes)

    def forward(self, x):
        #print(f"before rgb: {x.shape}")  # Expected: [batch_size, 1, height, width]
        x = self.gray_scale_input(x)  # Convert grayscale to RGB
        #print(f"after rgb: {x.shape}")  # Expected: [batch_size, 3, height, width]
        x = self.classifier(x)  # Forward through the classifier
        #print(f"classifier output: {x.shape}")  # Expected: [batch_size, num_classes]
        return x

    # ... (rest of your methods remain unchanged)

# Testing the model
num_classes = face_dataset.num_of_classes  # Make sure this is defined
model = FaceClassification(num_classes)

# Create a sample input tensor (1 image of size 128x128)
sample_input = torch.randn(3, 1, 128, 128)  # Batch size of 1, 1 channel, 128x128 size
output = model(sample_input)
print(output.shape)  # Should output (1, num_classes)


before rgb: torch.Size([3, 1, 128, 128])
after rgb: torch.Size([3, 3, 128, 128])
classifier output: torch.Size([3, 25])
torch.Size([3, 25])
