In [None]:
import numpy as np
import os
from PIL import Image
from torch.utils.data import Dataset

import torch
import torchvision.transforms as transforms
import torch.optim as optim
import torchvision.transforms.functional as FT
from tqdm import tqdm
from torch.utils.data import DataLoader, WeightedRandomSampler

import torch 
import torch.nn as nn
from math import ceil

DEVICE = "cuda" if torch.cuda.is_available else "cpu"


In [None]:
class Face_dataset(Dataset):
    
    def __init__(self, image_dirs, transforms=None):
        self.num_classes = len(image_dirs)
        image_paths = []
        labels = []
        for i in range(self.num_classes):
            image_paths.extend([os.path.join(image_dirs[i], j) for j in os.listdir(image_dirs[i])])
            labels.extend([i]* len(os.listdir(image_dirs[i])))
#         for i in range(self.num_classes):
#             image_paths.extend([os.path.abspath(j) for j in os.listdir(image_dirs[i])])
#             labels.extend([i]* len(os.listdir(image_dirs[i])))
#         for i in range(self.num_classes):
#             image_paths.extend(os.listdir(image_dirs[i]))
#             labels.extend([i]* len(os.listdir(image_dirs[i])))
        self.image_dirs = image_paths
        self.labels = labels
        self.transforms = transforms
        
    def __len__(self):
        return len(self.image_dirs)
        
    def __getitem__(self, index):
        image = Image.open(self.image_dirs[index])
        label = self.labels[index]
        
        if self.transforms is not None:
            image = self.transforms(image)
        
        return image, label
    
class Compose(object):
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, img):
        for t in self.transforms:
            img = t(img)

        return img
    
def get_loader(weighted_sampling, train_split = 0.7):
    face_dir = '/kaggle/input/face-classify-v2/TRAINV2/FACES'
    face_mask_dir = '/kaggle/input/face-classify-v2/TRAINV2/MASKED'
    face_hat_dir = '/kaggle/input/face-classify-v2/TRAINV2/HAT'
    face_mask_sunglass_dir = '/kaggle/input/face-classify-v2/TRAINV2/MASKwithSUNGLASS'
    face_sunglass_dir = '/kaggle/input/face-classify-v2/TRAINV2/SUNGLASSES'
    
    BATCH_SIZE=10
    
    test_face_dir = '/kaggle/input/face-classify-test-data/TEST/FACE'
    test_face_mask_dir = '/kaggle/input/face-classify-test-data/TEST/MASK'
    test_face_hat_dir = '/kaggle/input/face-classify-test-data/TEST/HAT'
    test_face_mask_sunglass_dir = '/kaggle/input/face-classify-test-data/TEST/mask_sunglasses'
    test_face_sunglass_dir = '/kaggle/input/face-classify-test-data/TEST/SUNGLASSES'
    
    img_dirs = [face_dir,face_mask_dir,face_hat_dir,face_mask_sunglass_dir,face_sunglass_dir]
    test_img_dirs = [test_face_dir,test_face_mask_dir,test_face_hat_dir,test_face_mask_sunglass_dir,test_face_sunglass_dir]

    class_weights = [1,5578/2138,5578/363,5578/567,5578/2483]
    transform = Compose([transforms.Resize((224, 224)), transforms.ToTensor(),])
    dt = Face_dataset(img_dirs, transforms=transform)
    test_dt = Face_dataset(test_img_dirs, transforms=transform)
    
    sample_weights = [0]*len(dt)
    for idx, (data,label) in enumerate(dt):
        class_weight = class_weights[int(label)]
        sample_weights[idx] = class_weight
    
    if weighted_sampling:
        sampler = WeightedRandomSampler(sample_weights,num_samples = len(sample_weights), replacement=True)
        dataloader = DataLoader(dt,batch_size = BATCH_SIZE, sampler=sampler)
    
    else:
        train_size = int(train_split * len(dt))
        test_size = len(dt) - train_size
        trains_dataset, stest_dataset = torch.utils.data.random_split(dt, [train_size, test_size])
#         train_data_size = int(len(dt)*train_split)
        train_dataloader = DataLoader(trains_dataset, batch_size = BATCH_SIZE,shuffle=True)
        s_test_dataloader = DataLoader(stest_dataset, batch_size = BATCH_SIZE,shuffle=True)
        
    test_dataloader = DataLoader(test_dt, batch_size=BATCH_SIZE)
    
    return train_dataloader, s_test_dataloader, test_dataloader
    
    

                

In [None]:


base_model = [
    # expand_ratio, channels, repeats, stride, kernel_size
    [1, 16, 1, 1, 3],
    [6, 24, 2, 2, 3],
    [6, 40, 2, 2, 5],
    [6, 80, 3, 2, 3],
    [6, 112, 3, 1, 5],
    [6, 192, 4, 2, 5],
    [6, 320, 1, 1, 3],
]

phi_values = {
    # tuple of: (phi_value, resolution, drop_rate)
    "b0": (0, 224, 0.2),  # alpha, beta, gamma, depth = alpha ** phi
    "b1": (0.5, 240, 0.2),
    "b2": (1, 260, 0.3),
    "b3": (2, 300, 0.3),
    "b4": (3, 380, 0.4),
    "b5": (4, 456, 0.4),
    "b6": (5, 528, 0.5),
    "b7": (6, 600, 0.5),
}


class CNNBlock(nn.Module):
    def __init__(
        self, in_channels, out_channels, kernel_size, stride, padding, groups=1
    ):
        super(CNNBlock, self).__init__()
        self.cnn = nn.Conv2d(
            in_channels,
            out_channels,
            kernel_size,
            stride,
            padding,
            groups=groups,
            bias=False,
        )
        self.bn = nn.BatchNorm2d(out_channels)
        self.silu = nn.SiLU()  # SiLU <-> Swish

    def forward(self, x):
        return self.silu(self.bn(self.cnn(x)))


class SqueezeExcitation(nn.Module):
    def __init__(self, in_channels, reduced_dim):
        super(SqueezeExcitation, self).__init__()
        self.se = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),  # C x H x W -> C x 1 x 1
            nn.Conv2d(in_channels, reduced_dim, 1),
            nn.SiLU(),
            nn.Conv2d(reduced_dim, in_channels, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        return x * self.se(x)


class InvertedResidualBlock(nn.Module):
    def __init__(
        self,
        in_channels,
        out_channels,
        kernel_size,
        stride,
        padding,
        expand_ratio,
        reduction=4,  # squeeze excitation
        survival_prob=0.8,  # for stochastic depth
    ):
        super(InvertedResidualBlock, self).__init__()
        self.survival_prob = 0.8
        self.use_residual = in_channels == out_channels and stride == 1
        hidden_dim = in_channels * expand_ratio
        self.expand = in_channels != hidden_dim
        reduced_dim = int(in_channels / reduction)

        if self.expand:
            self.expand_conv = CNNBlock(
                in_channels,
                hidden_dim,
                kernel_size=3,
                stride=1,
                padding=1,
            )

        self.conv = nn.Sequential(
            CNNBlock(
                hidden_dim,
                hidden_dim,
                kernel_size,
                stride,
                padding,
                groups=hidden_dim,
            ),
            SqueezeExcitation(hidden_dim, reduced_dim),
            nn.Conv2d(hidden_dim, out_channels, 1, bias=False),
            nn.BatchNorm2d(out_channels),
        )

    def stochastic_depth(self, x):
        if not self.training:
            return x

        binary_tensor = (
            torch.rand(x.shape[0], 1, 1, 1, device=x.device) < self.survival_prob
        )
        return torch.div(x, self.survival_prob) * binary_tensor

    def forward(self, inputs):
        x = self.expand_conv(inputs) if self.expand else inputs

        if self.use_residual:
            return self.stochastic_depth(self.conv(x)) + inputs
        else:
            return self.conv(x)


class EfficientNet(nn.Module):
    def __init__(self, version, num_classes):
        super(EfficientNet, self).__init__()
        width_factor, depth_factor, dropout_rate = self.calculate_factors(version)
        last_channels = ceil(1280 * width_factor)
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.features = self.create_features(width_factor, depth_factor, last_channels)
        self.classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(last_channels, num_classes),
        )

    def calculate_factors(self, version, alpha=1.2, beta=1.1):
        phi, res, drop_rate = phi_values[version]
        depth_factor = alpha**phi
        width_factor = beta**phi
        return width_factor, depth_factor, drop_rate

    def create_features(self, width_factor, depth_factor, last_channels):
        channels = int(32 * width_factor)
        features = [CNNBlock(3, channels, 3, stride=2, padding=1)]
        in_channels = channels

        for expand_ratio, channels, repeats, stride, kernel_size in base_model:
            out_channels = 4 * ceil(int(channels * width_factor) / 4)
            layers_repeats = ceil(repeats * depth_factor)

            for layer in range(layers_repeats):
                features.append(
                    InvertedResidualBlock(
                        in_channels,
                        out_channels,
                        expand_ratio=expand_ratio,
                        stride=stride if layer == 0 else 1,
                        kernel_size=kernel_size,
                        padding=kernel_size // 2,  # if k=1:pad=0, k=3:pad=1, k=5:pad=2
                    )
                )
                in_channels = out_channels

        features.append(
            CNNBlock(in_channels, last_channels, kernel_size=1, stride=1, padding=0)
        )

        return nn.Sequential(*features)

    def forward(self, x):
        x = self.pool(self.features(x))
        return self.classifier(x.view(x.shape[0], -1))

In [None]:
def train_fn(train_loader, model, optimizer, loss_fn):
    loop = tqdm(train_loader, leave=True)
    mean_loss = []
    
    for batch_idx, (x, y) in enumerate(loop):
        x, y = x.to(DEVICE), y.to(DEVICE)
        out = model(x)
        
        loss = loss_fn(out, y)
        mean_loss.append(loss.item())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # update progress bar
        loop.set_postfix(loss=loss.item())

    print(f"Mean loss was {sum(mean_loss)/len(mean_loss)}")
    
def test_fn(test_loader, model):
    model.eval()
    loop = tqdm(test_loader, leave = True)
    accuracy=0.0
    for batch_idx, (x,y) in enumerate(loop):
        x, y = x.to(DEVICE), y.to(DEVICE)
        out = model(x)
        preds = torch.argmax(out,1)
        acc = torch.eq(preds,y).float()
        
        accuracy+=torch.mean(acc)
    return accuracy/(batch_idx+1)
        

In [None]:
epochs = 10

model = EfficientNet(version="b0", num_classes=5).to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-4)
lr_scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, verbose=True)
train_loader, stest_loader, real_testloader = get_loader(weighted_sampling=False)

for i in range(epochs):
    train_fn(train_loader, model, optimizer, criterion)

accuracy = test_fn(stest_loader, model)
print(accuracy.item())

accuracy = test_fn(real_testloader, model)
print(accuracy.item())

In [None]:
# from PIL import Image
# import matplotlib.pyplot as plt
# def predictions(dataloader, model):
#     model.eval()
#     imgs=[]
#     labels =[]
# #     loop = tqdm(test_loader, leave = True)
#     accuracy=0.0
#     for batch_idx, (x,y) in enumerate(test_loader):
#         x, y = x.to(DEVICE), y.to(DEVICE)
#         out = model(x)
#         preds = torch.argmax(out,1)
#         for j in range(9):
#             print(y[j].cpu())
#             transform = transforms.ToPILImage()
#             img = transform(x[j].cpu().detach())
#             imgs.append(img)
#             labels.append(preds[j].cpu())
            
#     return imgs, labels
# #             plt.imsave(f'/kaggle/working/id{batch_idx}j{j}.png', img)
# #             import time
# #             time.sleep(5)
# #             plt.imshow(x[j].cpu().detach().numpy())
from PIL import Image
import time
import matplotlib.pyplot as plt
def predictions(dataloader, model):
    model.eval()
    imgs=[]
    labels =[]
#     loop = tqdm(test_loader, leave = True)
    accuracy=0.0
    for batch_idx, (x,y) in enumerate(dataloader):
        x, y = x.to(DEVICE), y.to(DEVICE)
        out = model(x)
        preds = torch.argmax(out,1)
        start_time = time.time()
        for j in range(9):
            print(y[j].cpu())
            transform = transforms.ToPILImage()
            img = transform(x[j].cpu().detach())
            imgs.append(img)
            labels.append(preds[j].cpu())
        end_time = time.time()
        print(f'Time taken for {len(dataloader)} batchframes : {end_time-start_time}')
    return imgs, labels
#             plt.imsave(f'/kaggle/working/id{batch_idx}j{j}.png', img)
#             import time
#             time.sleep(5)
#             plt.imshow(x[j].cpu().detach().numpy())
    
    

In [None]:
imgs, labels = predictions(stest_loader, model)
results = '/kaggle/working/classify_results'
if not os.path.exists(results):
    os.makedirs(results)
import cv2
classes = ['FACE', 'MASK', 'HAT', 'MASK+SUNGLASS', 'SUNGLASS']
for i in range(len(labels)):
    im = cv2.putText(np.array(imgs[i]), classes[int(labels[i].item())], (50,170), cv2.FONT_HERSHEY_SIMPLEX, 1,(255, 0, 0),2,cv2.LINE_AA)
    plt.imsave(os.path.join(results, f'fid{i}.png'), im)
    
    


In [None]:
def save_model(model, model_arch, path):
    if not os.path.exists(f'/kaggle/working/saved_models/{model_arch}'):
        os.makedirs(f'/kaggle/working/saved_models/{model_arch}')
    torch.save(model.state_dict(), f"/kaggle/working/saved_models/{model_arch}/{path}.pth")
    
save_model(model, model_arch="effnetb0", path="effnetb0_train")

In [None]:
    
def visualize_predictions(model, test_loader, model_arch, sampling_type, num_images=15, quantized=False):
        if quantized:
            device = "cpu"
        else:
            device = "cuda" if torch.cuda.is_available() else "cpu"
        model.eval()
        
        count=0
        all_preds = []
        all_labels = []
        all_images = []

        fig, axes = plt.subplots(4,5, figsize=(15,8))

        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)

                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
                all_images.extend(inputs.cpu().numpy())
                count+=1
                if count==num_images:
                    break

        for i in range(num_images):
            image = all_images[i]
            label = all_labels[i]
            pred = all_preds[i]

            row = i//5
            col = i%5
            ax = axes[row, col]

            # Visualize the image with true label and predicted label
            # plt.figure()
            # plt.subplot(4,5, i+1)
            
            ax.imshow(np.transpose(image, (1, 2, 0)))
            ax.axis('off')
            ax.set_title(f"L{label} : P{pred}")
            # plt.xlabel(f"Pred: {pred}")

        fig.suptitle('Labels = [0:FACES, 1:MASK, 2:HAT, 3:MASKwithSUNGLASSES, 4:SUNGLASS]', fontsize=13)
        plt.tight_layout()
        plt.savefig(f"/kaggle/working/saved_models/{model_arch}/{model_arch}_{sampling_type}_prediction_images.png")
        plt.show()

def save_model_q(model, model_arch, path, sampling_type=None):
    # save with script
    torch.jit.save(torch.jit.script(model), f"/kaggle/working/saved_models/{model_arch}/{path}_{sampling_type}.pth")
    
def evaluate_model(model, test_loader):
        device = "cuda" if torch.cuda.is_available() else "cpu"
        model.eval()
        
        all_preds = []
        all_labels = []

        with torch.no_grad():
            start_time = time.time()
            for inputs, labels in test_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)

                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
            end_time = time.time()
        accuracy = accuracy_score(all_labels, all_preds)
        time_taken = end_time - start_time
        print(f"Test Accuracy: {accuracy:.4f}")
        print(f'Time: {time_taken:.4f}')

        return accuracy

def evaluate_qmodel(model, test_loader):
        device = "cpu"
        model.eval()
        
        all_preds = []
        all_labels = []

        with torch.no_grad():
            start_time = time.time()
            for inputs, labels in test_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)

                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
        end_time = time.time()
        accuracy = accuracy_score(all_labels, all_preds)
        print(f"Test Accuracy: {accuracy:.4f}")
        print(f'Took {end_time-start_time} secs for {len(test_loader)} loader points')

        return accuracy

 

In [None]:
def quantized_model (model_path, model_arch, label, dataloaders, sampling_type=None):
    
    model = EfficientNet(version="b0", num_classes=5)

        
    model.load_state_dict(torch.load(model_path))
    # new_m= copy.deepcopy(model)

    model.eval()

    qconfig = get_default_qconfig("x86")
    qconfig_mapping = QConfigMapping().set_global(qconfig)

    example_inputs = torch.randn(5,3,224,224)

    prepared_model = prepare_fx(model, qconfig_mapping, example_inputs)
    print(prepared_model.graph)

    def calibrate(model, data_loader):
        with torch.inference_mode():
            for image, target in data_loader:
                model(image)
    calibrate(prepared_model, dataloaders)  # run calibration on sample data

    quantized_model = convert_fx(prepared_model)
    print(quantized_model)


    mname = f"{model_arch}_{label}_FXGquant"
    save_model_q(quantized_model, model_arch, mname, sampling_type)
    acc = evaluate_qmodel(quantized_model, dataloaders)
    visualize_predictions(quantized_model, dataloaders,model_arch, sampling_type, 20, quantized=True)
    
#     r_acc = evaluate_qmodel(quantized_model, dataloaders)
#     visualize_predictions(quantized_model, dataloaders,model_arch, sampling_type, 20, quantized=True)
    print("#### Successfully saved quantized model ####")
    


In [None]:
from torch.ao.quantization import get_default_qconfig
from torch.ao.quantization.quantize_fx import prepare_fx, convert_fx
from torch.ao.quantization import QConfigMapping
from sklearn.metrics import accuracy_score
quantized_model('/kaggle/working/saved_models/effnetb0/effnetb0_train.pth', "effnetb0", "TEST", stest_loader)

In [None]:
!zip -r effnetb0_scratch_allresults.zip /kaggle/working

In [None]:
# def quantized_model (model_path, model_arch, label, dataloaders, sampling_type):
    
#     model = ResNet_18(image_channels=3, num_classes=5)

        
#     model.load_state_dict(torch.load(model_path))
#     # new_m= copy.deepcopy(model)

#     model.eval()

#     qconfig = get_default_qconfig("x86")
#     qconfig_mapping = QConfigMapping().set_global(qconfig)

#     example_inputs = torch.randn(5,3,224,224)

#     prepared_model = prepare_fx(model, qconfig_mapping, example_inputs)
#     print(prepared_model.graph)

#     def calibrate(model, data_loader):
#         with torch.inference_mode():
#             for image, target in data_loader:
#                 model(image)
#     calibrate(prepared_model, dataloaders['val'])  # run calibration on sample data

#     quantized_model = convert_fx(prepared_model)
#     print(quantized_model)


#     mname = f"{model_arch}_{label}_FXGquant"
#     save_model_q(quantized_model, model_arch, mname, sampling_type)
#     acc = evaluate_qmodel(quantized_model, dataloaders['test'])
#     visualize_predictions(quantized_model, dataloaders['test'],model_arch, sampling_type, 20, quantized=True)
    
#     r_acc = evaluate_qmodel(quantized_model, dataloaders['real_test'])
#     visualize_predictions(quantized_model, dataloaders['real_test'],model_arch, sampling_type, 20, quantized=True)
#     print("#### Successfully saved quantized model ####")
    


In [None]:
# results = '/kaggle/working/classify_results'
# if not os.path.exists(results):
#     os.makedirs(results)
# import cv2
# classes = ['FACE', 'MASK', 'HAT', 'MASK+SUNGLASS', 'SUNGLASS']
# for i in range(len(labels)):
#     im = cv2.putText(np.array(imgs[i]), classes[int(labels[i].item())], (50,170), cv2.FONT_HERSHEY_SIMPLEX, 1,(255, 0, 0),2,cv2.LINE_AA)
#     plt.imsave(os.path.join(results, f'fid{i}.png'), im)

In [None]:
# checkpoint = {
#            "state_dict": model.state_dict(),
#            "optimizer": optimizer.state_dict(),
#        }
# print('saving model checkpoint')
# torch.save(checkpoint, 'resnet18_faceclassify.pth.tar')

In [None]:
# model_scripted = torch.jit.script(model.eval())
# model_scripted.save('model_scripted.pt')
# torch.onnx.export(model, torch.zeros((1, 3, 224, 224)), 'effnetb0.onnx', opset_version=14)

# model_fp32_path = 'effnetb0_fp32.onnx'
# dummy_in = torch.randn(1, 3, 224, 224, requires_grad=True)
# torch.onnx.export(model,                                         # model
#                   dummy_in,                                         # model input
#                   model_fp32_path,                                  # path
#                   export_params=True,                               # store the trained parameter weights inside the model file
#                   opset_version=14,                                 # the ONNX version to export the model to
#                   do_constant_folding=True,                         # constant folding for optimization
#                   input_names = ['input'],                          # input names
#                   output_names = ['output'],                        # output names
#                   dynamic_axes={'input' : {0 : 'batch_size'},       # variable length axes
#                                 'output' : {0 : 'batch_size'}})

In [None]:
# !zip -r effnetb0_allresults.zip /kaggle/working