<a href="https://colab.research.google.com/github/shubham151/ImageCaptioning/blob/main/Ai.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
from torchvision import datasets, transforms, models
from torchvision.datasets import ImageFolder
to_tensor = transforms.ToTensor()
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm.auto import tqdm
import itertools
import numpy as np


In [4]:
# !pip install torch torchvision
# # !pip install torch==1.10.0 torchvision==0.11.0
# !nvcc --version


# # import torch
# print(torch.__version__)
# import torchvision
# print(torchvision.__version__)



import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm.auto import tqdm
import itertools
import numpy as np

from torchvision import datasets, transforms, models
from torchvision.datasets import ImageFolder
to_tensor = transforms.ToTensor()
from torch.utils.data import DataLoader

In [5]:
import zipfile
from google.colab import drive

drive.mount('/content/drive/')

!unzip -q "/content/drive/MyDrive/Ai_proj/seg_pred.zip"
!unzip -q "/content/drive/MyDrive/Ai_proj/seg_test.zip"
!unzip -q "/content/drive/MyDrive/Ai_proj/seg_train.zip"


Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).
replace seg_pred/10004.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: replace seg_test/buildings/20057.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: replace seg_train/buildings/0.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [1]:
traindata_path = "/content/seg_train"
valdata_path = "/content/seg_test"
testset_path = "/content/seg_pred"

In [None]:
train_transforms = transforms.Compose([
    transforms.RandomHorizontalFlip(p = 0.5),
    transforms.ToTensor(),
    transforms.Resize([50,50], antialias = True),
])

val_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize([50,50], antialias = True)
])

train_data = ImageFolder(traindata_path, transform=train_transforms)
val_data = ImageFolder(valdata_path, transform = val_transforms)


train_loader = DataLoader(train_data, batch_size=64,num_workers = 4, shuffle=True, drop_last = True)
val_loader = DataLoader(val_data, batch_size=500, drop_last = False)


In [None]:
class FlexibleCNN(nn.Module):
    def __init__(self, num_blocks, channels, kernel_size=3, activation= nn.ReLU(), downsampling='maxpool', dropout_prob = 0.5):
        super(FlexibleCNN, self).__init__()

        self.blocks = nn.ModuleList()
        if isinstance(channels, list):
            assert len(channels) == num_blocks
        else:
            channels = [channels]*num_blocks

        channels = [3] + channels #at the beginning we have only 3

        for i in range(num_blocks):
            block = self.build_cnn_block(channels[i],channels[i+1], kernel_size, activation, downsampling, dropout_prob)
            self.blocks.append(block)

        self.global_avg_pooling = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(channels[-1], 6)  # 6 output classes

    def build_cnn_block(self, in_channels, out_channels, kernel_size, activation, downsampling, dropout_prob):
        layers = []

        stride = 2 if downsampling == "stride" else 1
        # Batch normalization
        layers.append(nn.BatchNorm2d(in_channels))

        # Convolutional layer
        layers.append(nn.Conv2d(in_channels= in_channels, out_channels= out_channels, kernel_size=kernel_size, padding=1, stride = stride))

        # Dropout layer
        layers.append(nn.Dropout2d(p = dropout_prob))

        layers.append(activation)

        # Downsampling technique
        if downsampling == 'maxpool':
            layers.append(nn.MaxPool2d(2, 2))

        return nn.Sequential(*layers)

    def forward(self, x):
        for block in self.blocks:
            x = block(x)

        x = self.global_avg_pooling(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

In [None]:
class ResCNN(nn.Module):
    def __init__(self, num_blocks, channels, kernel_size=3, activation= nn.ReLU(), downsampling='maxpool', dropout_prob = 0.5, padding = 1):
        super(ResCNN, self).__init__()

        self.res_blocks = nn.ModuleList()
        self.embed_layer = nn.Conv2d(3, channels,kernel_size = 1)
        self.activation = activation
        for i in range(num_blocks):
            res_block = self.build_res_block(channels,channels, kernel_size, activation, downsampling, dropout_prob, padding)
            self.res_blocks.append(res_block)

        self.maxpool = nn.MaxPool2d(2,2)
        self.global_avg_pooling = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(channels, 6)  # 6 output classes

    def build_res_block(self, in_channels, out_channels, kernel_size, activation, downsampling, dropout_prob, padding):
        layers = []
        layers.append(nn.BatchNorm2d(in_channels))
        layers.append(nn.Conv2d(in_channels= in_channels, out_channels= out_channels, kernel_size=kernel_size, padding=padding, stride = 1))
        layers.append(nn.Dropout2d(p = dropout_prob))
        layers.append(activation)
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.embed_layer(x)
        x = self.activation(x)
        for block in self.res_blocks:
            y = block(x)
            x = x + y
            x = self.maxpool(x)

        x = self.global_avg_pooling(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

# Example usage:
num_blocks = 2
channels = 256
kernel_size = 3
padding = 1
activation = nn.ReLU()
dropout_prob = 0.3

In [None]:
# best_result = max(results, key=lambda x: x['final_val_accuracy'])
print("\nBest result:", "Best result: {'num_blocks': 3, 'channels': [128, 256, 512], 'kernel_size': 3, 'activation': ReLU(), 'downsampling': 'maxpool', 'dropout_prob': 0.3, 'final_train_loss': 0.4787631332874298, 'final_train_accuracy': 0.827917913638307, 'final_val_loss': 0.43733835220336914, 'final_val_accuracy': 0.847}")

In [None]:
# WARNING : This cell is VERY long to run (~2 hours). The output has been copy-pasted below for convenience. Feel free to skip / comment it.

device = "cuda"
epochs = 10

num_blocks_values = [3]
channels_values = [64, 128, 256]
kernel_size_values = [3]
activation_values = [nn.ReLU(), nn.Sigmoid()]
downsampling_values = ['maxpool', 'stride']
dropout_prob_values = [0.3, 0.5]

best_accuracy = 0
best_params = {}
best_model = None

grid_search = itertools.product(num_blocks_values, channels_values, kernel_size_values, activation_values, downsampling_values, dropout_prob_values)

def train_and_evaluate_model(model, train_loader, val_loader, criterion, optimizer, epochs, device):
    for epoch in range(epochs):
        model.train()
        train_losses = []
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_losses.append(loss.item())
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = np.mean(train_losses)
        train_accuracy = correct / total

        # Validation
        model.eval()
        val_losses = []
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_losses.append(loss.item())
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_loss = np.mean(val_losses)
        val_accuracy = correct / total

        #print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss}, Train Accuracy: {train_accuracy}, Val Loss: {val_loss}, Val Accuracy: {val_accuracy}")


    return model, train_loss, train_accuracy, val_loss, val_accuracy

for num_blocks, channel, kernel_size, activation, downsampling, dropout_prob in grid_search:
    model = ResCNN(num_blocks, channels, kernel_size, activation, downsampling, dropout_prob)
    model = model.to(device)

    optimizer = optim.AdamW(model.parameters(), lr=1e-4)
    criterion = nn.CrossEntropyLoss()

    trained_model, train_loss, train_accuracy, val_loss, val_accuracy = train_and_evaluate_model(model, train_loader, val_loader, criterion, optimizer, epochs, device)

    print(f"num_blocks: {num_blocks}, channel: {channel}, kernel_size: {kernel_size}, activation: {activation}, downsampling: {downsampling}, dropout_prob: {dropout_prob}, Train Loss: {train_loss}, Train Accuracy: {train_accuracy}, Val Loss: {val_loss}, Val Accuracy: {val_accuracy}")

    # Compare and update the best model
    if val_accuracy > best_accuracy:
        best_accuracy = val_accuracy
        best_params = {'num_blocks': num_blocks, 'channel': channel, 'kernel_size': kernel_size, 'activation': activation, 'downsampling': downsampling, 'dropout_prob': dropout_prob}
        best_model = trained_model

print(f"Best parameters: {best_params} with accuracy {best_accuracy}")


num_blocks: 3, channel: 64, kernel_size: 3, activation: ReLU(), downsampling: maxpool, dropout_prob: 0.3, Train Loss: 0.641472888183376, Train Accuracy: 0.7664098173515982, Val Loss: 0.6117745985587438, Val Accuracy: 0.7786666666666666
num_blocks: 3, channel: 64, kernel_size: 3, activation: ReLU(), downsampling: maxpool, dropout_prob: 0.5, Train Loss: 0.7652883809995433, Train Accuracy: 0.7180365296803652, Val Loss: 0.7157183388868967, Val Accuracy: 0.7446666666666667
num_blocks: 3, channel: 64, kernel_size: 3, activation: ReLU(), downsampling: stride, dropout_prob: 0.3, Train Loss: 0.6443619552540453, Train Accuracy: 0.7685502283105022, Val Loss: 0.6104919662078222, Val Accuracy: 0.7816666666666666
num_blocks: 3, channel: 64, kernel_size: 3, activation: ReLU(), downsampling: stride, dropout_prob: 0.5, Train Loss: 0.7684658118034606, Train Accuracy: 0.7169663242009132, Val Loss: 0.6985585490862528, Val Accuracy: 0.7536666666666667
num_blocks: 3, channel: 64, kernel_size: 3, activation:

In [None]:
import pickle

# Save the best model using pickle
with open('best_model.pkl', 'wb') as file:
    pickle.dump(best_model, file)

**Alexnet**

In [3]:
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


NameError: ignored

In [2]:
train_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_data = ImageFolder(traindata_path, transform=train_transforms)
val_data = ImageFolder(valdata_path, transform = val_transforms)


train_loader = DataLoader(train_data, batch_size=64,num_workers = 4, shuffle=True, drop_last = True)
val_loader = DataLoader(val_data, batch_size=500, drop_last = False)


NameError: ignored

In [None]:
train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:01<00:00, 90493468.19it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data


In [14]:
# Define the AlexNet model
model = models.alexnet(pretrained=False)  # Set pretrained=True if you want to use a pre-trained model

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Training loop
num_epochs = 15
for epoch in range(num_epochs):
    for i, (inputs, labels) in enumerate(train_loader):
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i+1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item()}')



KeyboardInterrupt: ignored

In [2]:
# Define the AlexNet model
model = models.alexnet(pretrained=False)  # Set pretrained=True if you want to use a pre-trained model

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Training loop
num_epochs = 30
for epoch in range(num_epochs):
    correct = 0
    total = 0
    for i, (inputs, labels) in enumerate(train_loader):
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Calculate accuracy
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        if (i+1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item()}')

    # Calculate and print epoch accuracy
    epoch_accuracy = 100 * correct / total
    print(f'Accuracy of the model after epoch {epoch+1}: {epoch_accuracy}%')


NameError: ignored

# New Section

In [None]:
import pickle

# Save the best model using pickle
with open('alexnet_model.pkl', 'wb') as file:
    pickle.dump(model, file)

# New Section