In [14]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import torchvision.models as models
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, TensorDataset, ConcatDataset
import torchvision.transforms as transforms

In [15]:
def read_images(path):
    data_transform = transforms.Compose([transforms.Resize((224,224)), transforms.ToTensor()])
    dataset = ImageFolder(path, transform=data_transform)
        
    data = DataLoader(dataset, batch_size=32) 
    
    X = [] 
    y = []
    
    for image, label in tqdm(data):
        X.append(image) 
        y.append(label) 
        
    # Concatenate the lists of arrays along the batch dimension (axis=0)
    X = np.concatenate(X, axis=0)
    y = np.concatenate(y, axis=0)
        
    return X, y

In [16]:
def train_val_split(train_size:float = 0.2):    
    # Initialize lists to store validation and training data
    X_val = []
    y_val = []
    X = []
    y = []
    
    # 1000 because we have 1000 images of each class
    samples_per_class_val = (int) (1000 * train_size)
    
    for class_label in range(10):
        # extract indices corresponding to the current class
        class_indices = np.where(y_train==class_label)[0]
        
        # randomly select sample_per_class_val indices for validation
        val_indices = np.random.choice(class_indices, samples_per_class_val, replace=False)
        
        # append the selected val data to X_val and y_val
        X_val.extend(X_train[val_indices])
        y_val.extend(y_train[val_indices])
        
        # append the remaining data to X_train and y_train
        train_indices = np.setdiff1d(class_indices, val_indices)
        X.extend(X_train[train_indices])
        y.extend(y_train[train_indices])

    # convert python lists to np array
    X_val = np.array(X_val)
    y_val = np.array(y_val)
    X = np.array(X)
    y = np.array(y)
    
    return X, X_val, y, y_val

In [6]:
def shuffle_data(X, y):    
    # Combine X, y into a list of tuples
    data = list(zip(X, y))

    # Shuffle the combined data
    random.shuffle(data)

    # Unpack the shuffled data back into separate arrays
    X_shuffled, y_shuffled = zip(*data)

    # Convert the shuffled lists to NumPy arrays 
    X_shuffled = np.array(X_shuffled)
    y_shuffled = np.array(y_shuffled)
    
    return X_shuffled, y_shuffled

In [7]:
def create_dataloader(X, y, batch_size, shuffle=True):
    # Convert NumPy arrays to PyTorch tensors
    X_tensor = torch.from_numpy(X)
    y_tensor = torch.from_numpy(y)

    # Create a TensorDataset from X_train_tensor and y_train_tensor
    dataset = TensorDataset(X_tensor, y_tensor)

    # Define batch size and create DataLoader
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    return loader

In [8]:
train_path = "/Users/pratikkadlak/Pratik/DeepLearning/DL_Assignment_2/inaturalist_12K/train"
test_path = "/Users/pratikkadlak/Pratik/DeepLearning/DL_Assignment_2/inaturalist_12K/val"

X_train, y_train = read_images(train_path)
X_test, y_test = read_images(test_path)

X_train, X_val, y_train, y_val = train_val_split(0.2) 

X_train, y_train = shuffle_data(X_train, y_train)
X_val, y_val = shuffle_data(X_val, y_val)

train_loader = create_dataloader(X_train, y_train, 32)
val_loader = create_dataloader(X_val, y_val, 32)
test_loader = create_dataloader(X_test, y_test, 32)

  0%|          | 0/313 [00:00<?, ?it/s]

  0%|          | 0/63 [00:00<?, ?it/s]

# GoogleNet

In [21]:
# Define GoogLeNet model
model = models.googlenet(pretrained=True)  # Load pre-trained weights
num_classes = 10
model.fc = nn.Linear(model.fc.in_features, num_classes)  # Modify final FC layer

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Train the model
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model.to(device)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    # Evaluate the model
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f'Epoch {epoch+1}/{num_epochs}, Test Accuracy: {accuracy:.2f}%')

# Save the trained model
torch.save(model.state_dict(), 'googlenet_model.pth')

Downloading: "https://download.pytorch.org/models/googlenet-1378be20.pth" to /Users/pratikkadlak/.cache/torch/hub/checkpoints/googlenet-1378be20.pth
100%|███████████████████████████████████████| 49.7M/49.7M [00:58<00:00, 890kB/s]


Epoch 1/10, Test Accuracy: 71.10%
Epoch 2/10, Test Accuracy: 74.60%
Epoch 3/10, Test Accuracy: 76.10%
Epoch 4/10, Test Accuracy: 76.85%
Epoch 5/10, Test Accuracy: 77.45%
Epoch 6/10, Test Accuracy: 77.80%
Epoch 7/10, Test Accuracy: 77.60%
Epoch 8/10, Test Accuracy: 77.15%
Epoch 9/10, Test Accuracy: 77.40%
Epoch 10/10, Test Accuracy: 77.70%


## Freezing all layers except the last layer:
- Freeze all layers except the final classification layer.
- Fine-tune only the weights of the last layer during training.

In [13]:
num_classes = 10

# Load pre-trained GoogLeNet
model = models.googlenet(pretrained=True)



# Freeze all layers except the last layer
for param in model.parameters():
    param.requires_grad = False

# Modify the last layer for your specific classification task
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, num_classes)  # num_classes is the number of classes in your dataset

device = "mps" if torch.backends.mps.is_available() else "cpu"
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define optimizer and loss function
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Train the model
def train_model(model, criterion, optimizer, train_loader, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct_predictions = 0
        total_predictions = 0
        for images, labels in tqdm(train_loader):  # assuming you have a DataLoader for training data
            images , labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_predictions += labels.size(0)
        
        epoch_loss = running_loss / len(train_loader)
        epoch_accuracy = correct_predictions / total_predictions
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}')


train_model(model, criterion, optimizer, train_loader)

  0%|          | 0/250 [00:00<?, ?it/s]

Epoch 1/10, Loss: 1.4668


  0%|          | 0/250 [00:00<?, ?it/s]

[E thread_pool.cpp:110] Exception in thread pool task: mutex lock failed: Invalid argument
[E thread_pool.cpp:110] Exception in thread pool task: mutex lock failed: Invalid argument
[E thread_pool.cpp:110] Exception in thread pool task: mutex lock failed: Invalid argument
[E thread_pool.cpp:110] Exception in thread pool task: mutex lock failed: Invalid argument
[E thread_pool.cpp:110] Exception in thread pool task: mutex lock failed: Invalid argument
[E thread_pool.cpp:110] Exception in thread pool task: mutex lock failed: Invalid argument


KeyboardInterrupt: 

In [None]:
def test_model(model, criterion, test_loader):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    with torch.no_grad():
        for images, labels in tqdm(test_loader):  # assuming you have a DataLoader for test data
            images , labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_predictions += labels.size(0)
        
    test_loss = running_loss / len(test_loader)
    test_accuracy = correct_predictions / total_predictions
    print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

test_model(model, criterion, test_loader)

## Fine-tuning up to a certain number of layers:

- Freeze the initial layers (e.g., convolutional layers) and fine-tune only the later layers (e.g., fully connected layers).
- Experiment with different values of 'k' to find the optimal number of layers to fine-tune.

In [None]:
# Load pre-trained GoogLeNet
model = models.googlenet(pretrained=True)

# Define the number of layers to fine-tune (k)
k = 5  # Example: Fine-tune the last 5 layers

# Freeze layers up to k
if k > 0:
    for i, child in enumerate(model.children()):
        if i < k:
            for param in child.parameters():
                param.requires_grad = False
        else:
            break

# Modify the classifier for your specific classification task
num_ftrs = model.fc.in_features
num_classes = 10  # Change this to your actual number of classes
model.fc = nn.Linear(num_ftrs, num_classes)

# Move the model to GPU if available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = "mps" if torch.backends.mps.is_available() else "cpu"
model = model.to(device)

# Define optimizer and loss function
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Train the model
def train_model(model, criterion, optimizer, train_loader, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_preds = 0
        total_preds = 0
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct_preds += (predicted == labels).sum().item()
            total_preds += labels.size(0)
        
        epoch_loss = running_loss / len(train_loader)
        epoch_accuracy = correct_preds / total_preds
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}')

train_model(model, criterion, optimizer, train_loader)

In [None]:
def test_model(model, criterion, test_loader):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    with torch.no_grad():
        for images, labels in tqdm(test_loader):  # assuming you have a DataLoader for test data
            images , labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_predictions += labels.size(0)
        
    test_loss = running_loss / len(test_loader)
    test_accuracy = correct_predictions / total_predictions
    print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

test_model(model, criterion, test_loader)

## Feature extraction using pre-trained models:

- Use pre-trained models like GoogLeNet, InceptionV3, ResNet50, etc., as feature extractors.
- Remove the final classification layer and use the extracted features as inputs to a smaller model (e.g., a simple feedforward neural network).
- Train the smaller model on the extracted features to classify images.

In [38]:
# Load pre-trained GoogLeNet without classification head
model = models.googlenet(pretrained=True)
model.fc = nn.Identity()  # Remove the final classification layer

# Define a smaller model (simple feedforward neural network)
class SimpleModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(SimpleModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# Parameters for the smaller model
input_size = 1024  # Input size from GoogLeNet's extracted features
hidden_size = 256  # Hidden layer size
num_classes = 10  # Number of classes in your dataset

# Move the model to GPU if available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = "mps" if torch.backends.mps.is_available() else "cpu"
small_model = small_model.to(device)
model = model.to(device)

# Define optimizer and loss function for the smaller model
optimizer = torch.optim.Adam(small_model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Function to extract features using GoogLeNet
def extract_features(model, data_loader):
    model.eval()
    features = []
    labels = []
    with torch.no_grad():
        print("Extracting Features")
        for images, targets in tqdm(data_loader):
            images, targets = images.to(device), targets.to(device)
            features.append(model(images).detach().cpu())
            labels.append(targets)
    features = torch.cat(features)
    labels = torch.cat(labels)
    return features, labels

# Extract features using GoogLeNet
extracted_features, extracted_labels = extract_features(model, train_loader)
# extracted_features, extracted_labels = extracted_features.to(device), extracted_labels.to(device)



Extracting Features


  0%|          | 0/250 [00:00<?, ?it/s]

In [48]:
input_size = extracted_features.size(1)

# Initialize the smaller model
small_model = SimpleModel(input_size, hidden_size, num_classes)
small_model = small_model.to(device)

In [47]:
def train_small_model(model, optimizer, criterion, features, labels, train_loader, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        optimizer.zero_grad()
        features = features.to(device)
        outputs = model(features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        # Calculate accuracy
        correct_preds = 0
        total_preds = 0
        for images, targets in tqdm(train_loader):
            images, targets = images.to(features.device), targets.to(labels.device)
            with torch.no_grad():
                outputs = model(images)
                _, predicted = torch.max(outputs, 1)
                correct_preds += (predicted == targets).sum().item()
                total_preds += targets.size(0)
        
        epoch_accuracy = correct_preds / total_preds
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}, Accuracy: {epoch_accuracy:.4f}')

# Example usage
train_small_model(small_model, optimizer, criterion, extracted_features, extracted_labels, train_loader)

RuntimeError: linear(): input and weight.T shapes cannot be multiplied (7999x1024 and 224x256)

In [60]:
class CNN(nn.Module):
    def __init__(self, in_channels, out_channels, num_filters, kernel_size, activation_fn, apply_batchnorm, apply_dropout, prob, hidden_units):
        super(CNN, self).__init__()
        # Define the convolution layers
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=num_filters[0], kernel_size=kernel_size[0], stride=1, padding=0)
        self.batchnorm1 = nn.BatchNorm2d(num_filters[0])
        self.conv2 = nn.Conv2d(in_channels=num_filters[0],out_channels=num_filters[1], kernel_size=kernel_size[1], stride=1, padding=0)
        self.batchnorm2 = nn.BatchNorm2d(num_filters[1])
        self.conv3 = nn.Conv2d(in_channels=num_filters[1], out_channels=num_filters[2], kernel_size=kernel_size[2], stride=1, padding=0)
        self.batchnorm3 = nn.BatchNorm2d(num_filters[2])
        self.conv4 = nn.Conv2d(in_channels=num_filters[2], out_channels=num_filters[3], kernel_size=kernel_size[3], stride=1, padding=0)
        self.batchnorm4 = nn.BatchNorm2d(num_filters[3])
        self.conv5 = nn.Conv2d(in_channels=num_filters[3],out_channels=num_filters[4], kernel_size=kernel_size[4], stride=1, padding=0)
        self.batchnorm5 = nn.BatchNorm2d(num_filters[4])

        # Define activation function
        if activation_fn == "ReLU": self.activation = nn.ReLU()
        elif activation_fn == "GELU": self.activation = nn.GELU()
        elif activation_fn == "SiLU": self.activation = nn.SiLU()
        elif activation_fn == "Mish": self.activation = nn.Mish()
        
        # Define max pooling layer
        self.pool = nn.MaxPool2d(kernel_size=2)
        
        # calculating the size of the input of the first fc layer
        input_size = 227
        for i in range(5):
            input_size = input_size - kernel_size[i] + 1
            input_size = input_size // 2
        
        # Define dense layer
        self.fc1 = nn.Linear(input_size*input_size*num_filters[4], hidden_units)
        
        # Adding Dropout
        self.dropout = nn.Dropout(p=prob)
        
        # Define output layer
        self.fc2 = nn.Linear(hidden_units, 10)  # 10 output neurons for 10 classes
        
        self.apply_batchnorm = apply_batchnorm
        self.apply_dropout = apply_dropout
        

    def forward(self, x):
        # Apply convolution, activation, and max pooling layers
        x = self.pool(self.activation(self.batchnorm1(self.conv1(x)) if self.apply_batchnorm =="Yes" else self.conv1(x)))
        x = self.pool(self.activation(self.batchnorm2(self.conv2(x)) if self.apply_batchnorm =="Yes" else self.conv2(x)))
        x = self.pool(self.activation(self.batchnorm3(self.conv3(x)) if self.apply_batchnorm =="Yes" else self.conv3(x)))
        x = self.pool(self.activation(self.batchnorm4(self.conv4(x)) if self.apply_batchnorm =="Yes" else self.conv4(x)))
        x = self.pool(self.activation(self.batchnorm5(self.conv5(x)) if self.apply_batchnorm =="Yes" else self.conv5(x)))
        
        # Flatten the output for the dense layer
        x = x.view(x.size(0), -1)
        
        # Apply dense layer and output layer
        x = self.activation(self.fc1(x))
        
        if self.apply_dropout=="Yes": x = self.dropout(x)
        x = self.fc2(x)
        
        # Apply softmax activation to the output
        x = F.softmax(x, dim=1)  
        return x

In [50]:
# Load pre-trained GoogLeNet without the final classification layer
googlenet = models.googlenet(pretrained=True)
googlenet = nn.Sequential(*list(googlenet.children())[:-1])  # Remove the final layer

# Define a smaller feedforward neural network for classification
# class SimpleClassifier(nn.Module):
#     def __init__(self, input_size, hidden_size, num_classes):
#         super(SimpleClassifier, self).__init__()
#         self.fc1 = nn.Linear(input_size, hidden_size)
#         self.relu = nn.ReLU()
#         self.fc2 = nn.Linear(hidden_size, num_classes)

#     def forward(self, x):
#         x = self.fc1(x)
#         x = self.relu(x)
#         x = self.fc2(x)
#         return x

# Set device (GPU if available, otherwise CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Extract features using GoogLeNet
features_list = []
labels_list = []
with torch.no_grad():
    for images, labels in tqdm(train_loader):
        images = images.to(device)
        features = googlenet(images).squeeze()  # Remove the batch dimension
        features_list.append(features)
        labels_list.append(labels)

# Concatenate features and labels
features = torch.cat(features_list, dim=0)
labels = torch.cat(labels_list, dim=0)

  0%|          | 0/250 [00:00<?, ?it/s]

NameError: name 'dataset' is not defined

In [59]:
# Define the input size for the classifier based on the extracted features
input_size = features.size(1)

# Initialize the simple classifier and move it to the device
# classifier = SimpleClassifier(input_size, hidden_size=128, num_classes=10).to(device)

num_filters = [32,32,32,32,32]
kernel_size = [3,3,3,3,3]
activation_fn = "ReLU"
batch_norm = "No"
dropout = "No"
prob = 0.2
hidden_units = 256

model = CNN(
            in_channels=3,
            out_channels=10,
            num_filters=num_filters,
            kernel_size=kernel_size,
            activation_fn=activation_fn,
            apply_batchnorm=batch_norm,
            apply_dropout=dropout,
            prob=prob,
            hidden_units=hidden_units
        )

model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001)

# Train the classifier on the extracted features
num_epochs = 10
for epoch in range(num_epochs):
    optimizer.zero_grad()
    outputs = classifier(features)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()

    # Calculate accuracy
    _, predicted = torch.max(outputs, 1)
    correct = (predicted == labels).sum().item()
    accuracy = correct / len(labels)

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}, Accuracy: {accuracy * 100:.2f}%')

# Save or use the trained classifier for inference

Epoch [1/10], Loss: 1.549080491065979, Accuracy: 62.75%
Epoch [2/10], Loss: 1.4785223007202148, Accuracy: 63.17%
Epoch [3/10], Loss: 1.4185056686401367, Accuracy: 62.07%
Epoch [4/10], Loss: 1.3637558221817017, Accuracy: 64.26%
Epoch [5/10], Loss: 1.3159065246582031, Accuracy: 64.70%
Epoch [6/10], Loss: 1.2735456228256226, Accuracy: 64.52%
Epoch [7/10], Loss: 1.2341926097869873, Accuracy: 64.66%
Epoch [8/10], Loss: 1.1994515657424927, Accuracy: 65.31%
Epoch [9/10], Loss: 1.1679999828338623, Accuracy: 65.87%
Epoch [10/10], Loss: 1.1391072273254395, Accuracy: 66.11%
