In [81]:
# Apply the suggested random number generation process set by the question for reproduceability sake:

import numpy as np
import torch

SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    torch.cuda.manual_seed_all(SEED)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False


In [82]:
# Import necessary libraries
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.datasets import ImageFolder
import numpy as np
from tqdm import tqdm
from PIL import Image

# Try to import torchsummary, but it's optional
try:
    from torchsummary import summary
    HAS_TORCHSUMMARY = True
except ImportError:
    HAS_TORCHSUMMARY = False
    print("torchsummary not available. Install with: pip install torchsummary")



## Model Architecture

For this homework we will use Convolutional Neural Network (CNN). We'll use PyTorch.

Model structure:
* The shape for input should be `(3, 200, 200)` (channels first format in PyTorch)
* Next, create a convolutional layer (`nn.Conv2d`):
    * Use 32 filters (output channels)
    * Kernel size should be `(3, 3)` (that's the size of the filter)
    * Use `'relu'` as activation
* Reduce the size of the feature map with max pooling (`nn.MaxPool2d`)
    * Set the pooling size to `(2, 2)`
* Turn the multi-dimensional result into vectors using `flatten` or `view`
* Next, add a `nn.Linear` layer with 64 neurons and `'relu'` activation
* Finally, create the `nn.Linear` layer with 1 neuron - this will be the output
    * The output layer should have an activation - use the appropriate activation for the binary classification case

As optimizer use `torch.optim.SGD` with the following parameters:
* `torch.optim.SGD(model.parameters(), lr=0.002, momentum=0.8)`


In [83]:
# Define HairDataset

class HairDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.classes = sorted(os.listdir(data_dir))
        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}

        for label_name in self.classes:
            label_dir = os.path.join(data_dir, label_name)
            for img_name in os.listdir(label_dir):
                self.image_paths.append(os.path.join(label_dir, img_name))
                self.labels.append(self.class_to_idx[label_name])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

## Question 1: Which loss function you will use?

For binary classification with sigmoid activation, we use **BCELoss** (Binary Cross Entropy Loss).

Answer: **nn.BCELoss()** (or **nn.BCEWithLogitsLoss()** if we didn't use sigmoid)


In [84]:
# Loss function for binary classification
criterion = nn.BCELoss()


## Question 2: Total number of parameters


In [85]:
# Question 2: Total number of parameters

# Use Method 2: Count manually
total_params = sum(p.numel() for p in model.parameters())

print(f"\nTotal parameters: {total_params:,}")


Total parameters: 20,073,473


## Data Preparation

We'll use the **straight/curly hair** dataset from
`http://github.com/SVizor42/ML_Zoomcamp/releases/download/straight-curly-data/data.zip`.

Once unzipped, the dataset has the following folder structure:
- `data/train/`
- `data/test/`

Each split contains two subfolders (e.g. `curly/` and `straight/`) with images.
We'll:
- Download and unzip the dataset (if not already present)
- Resize images to 200x200 as required
- Keep it as a **binary** problem (curly vs straight).


In [86]:
# Download and unzip straight/curly hair dataset if not already present
import urllib.request
import zipfile

DATA_URL = "http://github.com/SVizor42/ML_Zoomcamp/releases/download/straight-curly-data/data.zip"
DATA_ZIP_PATH = "./data.zip"
DATA_DIR = "./data"

if not os.path.exists(DATA_DIR):
    print("Downloading dataset...")
    urllib.request.urlretrieve(DATA_URL, DATA_ZIP_PATH)
    print("Unzipping dataset...")
    with zipfile.ZipFile(DATA_ZIP_PATH, "r") as zip_ref:
        zip_ref.extractall(".")
    print("Done!")
else:
    print("Dataset already present, skipping download.")

Dataset already present, skipping download.


## Generators and Training
- We don't need to do any additional pre-processing for the images.
- Use batch_size=20
- Use shuffle=True for both training, but False for test.

Now fit the model.

In [87]:
# @title
# Path to straight/curly hair dataset (after unzipping data.zip)
data_dir = "./data"

# Define transforms for training (without augmentation initially)
input_size = 200

mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

# Simple transforms - just resize and normalize
train_transforms = transforms.Compose([
    transforms.Resize((200, 200)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=mean,
        std=std
    ) # ImageNet normalization
])

test_transforms = transforms.Compose([
    transforms.Resize((input_size, input_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

# Load dataset splits using ImageFolder
# Expected structure:
# ./data/train/curly
# ./data/train/straight
# ./data/test/curly
# ./data/test/straight
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")

#train_dataset = ImageFolder(train_dir, transform=transform_train)
#test_dataset = ImageFolder(test_dir, transform=transform_test)

#print(f"Classes: {train_dataset.classes}")

# Since this dataset is already binary (curly vs straight),
# we don't need to remap labels; ImageFolder will give 0/1 labels.

# Create data loaders
#train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
#test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

#print(f"Training samples: {len(train_dataset)}")
#print(f"Test samples: {len(test_dataset)}")


In [88]:
from torch.utils.data import DataLoader

train_dataset = HairDataset(
    data_dir='./data/train',
    transform=train_transforms
)

test_dataset = HairDataset(
    data_dir='./data/test',
    transform=test_transforms
)

train_loader = DataLoader(train_dataset, batch_size=20, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=20, shuffle=False)

In [89]:
# Define the CNN model
class HairClassifierMobileNet(nn.Module):
    def __init__(self, num_classes=1):
        super(HairClassifierMobileNet, self).__init__()

        # Convolutional layer: (3, 200, 200) -> (32, 198, 198)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=(3, 3))
        self.relu1 = nn.ReLU()

        # Max pooling: (32, 198, 198) -> (32, 99, 99)
        self.pool = nn.MaxPool2d(kernel_size=(2, 2))

        # Flatten: (32, 99, 99) -> (32 * 99 * 99,)
        # First linear layer: 32 * 99 * 99 -> 64
        self.fc1 = nn.Linear(32 * 99 * 99, 64)
        self.relu2 = nn.ReLU()

        # Output layer: 64 -> 1 (binary classification)
        self.fc2 = nn.Linear(64, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Convolution + ReLU
        x = self.conv1(x)
        x = self.relu1(x)

        # Max pooling
        x = self.pool(x)

        # Flatten
        x = x.view(x.size(0), -1)

        # First fully connected layer + ReLU
        x = self.fc1(x)
        x = self.relu2(x)

        # Output layer + Sigmoid
        x = self.fc2(x)
        x = self.sigmoid(x)

        return x

" # old version\nclass HairClassifierMobileNet(nn.Module):\n    def __init__(self, num_classes=2):\n        super(HairClassifierMobileNet, self).__init__()\n\n        # Load pre-trained MobileNetV2\n        self.base_model = models.mobilenet_v2(weights='IMAGENET1K_V1')\n\n        # Freeze base model parameters\n        for param in self.base_model.parameters():\n            param.requires_grad = False\n\n        # Remove original classifier\n        self.base_model.classifier = nn.Identity()\n\n        # Add custom layers\n        self.global_avg_pooling = nn.AdaptiveAvgPool2d((1, 1))\n        self.output_layer = nn.Linear(1280, num_classes)\n\n    def forward(self, x):\n        x = self.base_model.features(x)\n        x = self.global_avg_pooling(x)\n        x = torch.flatten(x, 1)\n        x = self.output_layer(x)\n        return x\n"

In [90]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create model instance
model = HairClassifierMobileNet(num_classes=1)
model.to(device);

# Optimizer
optimizer = optim.SGD(model.parameters(), lr=0.002, momentum=0.8)


In [91]:
# Fitting model

def fit(model):
  num_epochs = 10
  history = {'acc': [], 'loss': [], 'test_acc': [], 'test_loss': []}

  for epoch in range(num_epochs):
      model.train()
      running_loss = 0.0
      correct_train = 0
      total_train = 0
      for images, labels in train_loader:
          images, labels = images.to(device), labels.to(device)
          labels = labels.float().unsqueeze(1) # Ensure labels are float and have shape (batch_size, 1)

          optimizer.zero_grad()
          outputs = model(images)
          loss = criterion(outputs, labels)
          loss.backward()
          optimizer.step()

          running_loss += loss.item() * images.size(0)
          # For binary classification with BCEWithLogitsLoss, apply sigmoid to outputs before thresholding for accuracy
          predicted = (torch.sigmoid(outputs) > 0.5).float()
          total_train += labels.size(0)
          correct_train += (predicted == labels).sum().item()

      epoch_loss = running_loss / len(train_dataset)
      epoch_acc = correct_train / total_train
      history['loss'].append(epoch_loss)
      history['acc'].append(epoch_acc)

      model.eval()
      test_running_loss = 0.0
      correct_test = 0
      total_test = 0
      with torch.no_grad():
          for images, labels in test_loader:
              images, labels = images.to(device), labels.to(device)
              labels = labels.float().unsqueeze(1)

              outputs = model(images)
              loss = criterion(outputs, labels)

              test_running_loss += loss.item() * images.size(0)
              predicted = (torch.sigmoid(outputs) > 0.5).float()
              total_test += labels.size(0)
              correct_test += (predicted == labels).sum().item()

      test_epoch_loss = test_running_loss / len(test_dataset)
      test_epoch_acc = correct_test / total_test
      history['test_loss'].append(test_epoch_loss)
      history['test_acc'].append(test_epoch_acc)

      print(f"Epoch {epoch+1}/{num_epochs}, "
            f"Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.4f}, "
            f"Test Loss: {test_epoch_loss:.4f}, Test Acc: {test_epoch_acc:.4f}")

  return history


Epoch 1/10, Loss: 0.6500, Acc: 0.4869, Test Loss: 0.5966, Test Acc: 0.4876
Epoch 2/10, Loss: 0.5670, Acc: 0.4869, Test Loss: 0.6158, Test Acc: 0.4876
Epoch 3/10, Loss: 0.5171, Acc: 0.4869, Test Loss: 0.5932, Test Acc: 0.4876
Epoch 4/10, Loss: 0.4743, Acc: 0.4869, Test Loss: 0.6195, Test Acc: 0.4876
Epoch 5/10, Loss: 0.4134, Acc: 0.4869, Test Loss: 0.7173, Test Acc: 0.4876
Epoch 6/10, Loss: 0.4359, Acc: 0.4869, Test Loss: 0.6727, Test Acc: 0.4876
Epoch 7/10, Loss: 0.3327, Acc: 0.4869, Test Loss: 0.7522, Test Acc: 0.4876
Epoch 8/10, Loss: 0.2913, Acc: 0.4869, Test Loss: 1.1109, Test Acc: 0.4876
Epoch 9/10, Loss: 0.2779, Acc: 0.4869, Test Loss: 0.6870, Test Acc: 0.4876
Epoch 10/10, Loss: 0.4130, Acc: 0.4869, Test Loss: 0.6621, Test Acc: 0.4876


## Question 3: Median of training accuracy for all epochs


In [92]:
# Question 3: Median of training accuracy
median_train_acc = np.median(history['acc'])
print(f"Median training accuracy: {median_train_acc:.4f}")
print(f"\nTraining accuracies: {[f'{acc:.4f}' for acc in history['acc']]}")
print(f"\nAnswer: {median_train_acc:.2f}")


Median training accuracy: 0.4869

Training accuracies: ['0.4869', '0.4869', '0.4869', '0.4869', '0.4869', '0.4869', '0.4869', '0.4869', '0.4869', '0.4869']

Answer: 0.49


## Question 4: Standard deviation of training loss for all epochs


In [93]:
# Question 4: Standard deviation of training loss
std_train_loss = np.std(history['loss'])
print(f"Standard deviation of training loss: {std_train_loss:.4f}")
print(f"\nTraining losses: {[f'{loss:.4f}' for loss in history['loss']]}")
print(f"\nAnswer: {std_train_loss:.3f}")


Standard deviation of training loss: 0.1134

Training losses: ['0.6500', '0.5670', '0.5171', '0.4743', '0.4134', '0.4359', '0.3327', '0.2913', '0.2779', '0.4130']

Answer: 0.113


## Data Augmentation

Add the augmentations to your training data generator

In [96]:
transform_train_aug = transforms.Compose([
    transforms.Resize((200, 200)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=mean,
        std=std
    ),
    transforms.RandomRotation(50),
    transforms.RandomResizedCrop(200, scale=(0.9, 1.0), ratio=(0.9, 1.1)),
    transforms.RandomHorizontalFlip()
])

transform_test_aug = transforms.Compose([
    transforms.Resize((input_size, input_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std),
    transforms.RandomRotation(50),
    transforms.RandomResizedCrop(200, scale=(0.9, 1.0), ratio=(0.9, 1.1)),
    transforms.RandomHorizontalFlip()
])


# Reload training dataset with augmentation from straight/curly hair data
train_dataset_aug = HairDataset(
    data_dir='./data/train',
    transform=transform_train_aug
)

test_dataset_aug = HairDataset(
    data_dir='./data/test',
    transform=transform_test_aug
)

train_loader = DataLoader(train_dataset, batch_size=20, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=20, shuffle=False)

In [97]:
# Train for 10 more epochs with augmentation
# Note: We're continuing to train the same model, not creating a new one
num_epochs_aug = 10
train_losses_aug = []
train_accuracies_aug = []
test_losses_aug = []
test_accuracies_aug = []

for epoch in range(num_epochs_aug):
    print(f"\nEpoch {epoch+1}/{num_epochs_aug} (with augmentation)")

    # Train with augmented data
    train_loss, train_acc = train_epoch(model, train_loader_aug, criterion, optimizer, device)
    train_losses_aug.append(train_loss)
    train_accuracies_aug.append(train_acc)

    # Evaluate
    test_loss, test_acc = evaluate(model, test_loader, criterion, device)
    test_losses_aug.append(test_loss)
    test_accuracies_aug.append(test_acc)

    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
    print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}")



Epoch 1/10 (with augmentation)


NameError: name 'train_epoch' is not defined

## Question 5: Mean of test loss for all epochs (with augmentation)


In [None]:
# Question 5: Mean of test loss for all epochs with augmentation
mean_test_loss_aug = np.mean(test_losses_aug)
print(f"Mean test loss (with augmentation): {mean_test_loss_aug:.4f}")
print(f"\nTest losses: {[f'{loss:.4f}' for loss in test_losses_aug]}")
print(f"\nAnswer: {mean_test_loss_aug:.3f}")


Mean test loss (with augmentation): 0.6167

Test losses: ['0.6139', '0.6105', '0.6203', '0.6327', '0.6119', '0.6223', '0.6214', '0.5955', '0.6254', '0.6130']

Answer: 0.617


## Question 6: Average of test accuracy for the last 5 epochs (epochs 6-10) with augmentation


In [None]:
# Question 6: Average of test accuracy for last 5 epochs (epochs 6-10, indices 5-9)
last_5_test_acc = test_accuracies_aug[5:10]  # Epochs 6-10 (0-indexed: 5-9)
avg_last_5_test_acc = np.mean(last_5_test_acc)

print(f"Test accuracies for epochs 6-10: {[f'{acc:.4f}' for acc in last_5_test_acc]}")
print(f"Average test accuracy (last 5 epochs): {avg_last_5_test_acc:.4f}")
print(f"\nAnswer: {avg_last_5_test_acc:.2f}")


Test accuracies for epochs 6-10: ['0.6766', '0.6766', '0.6965', '0.6766', '0.6915']
Average test accuracy (last 5 epochs): 0.6836

Answer: 0.68
