In [1]:
"""
In this cell, complete 
1) relu function
2) max_pooling_with_mask function
3) conv2d_multi_channel_forward function
4) forward method in the SimpleCNN class

There are comments in the exact locations where modifications should be made

DO NOT TOUCH ANYTHING ELSE IN THIS CELL
"""
import os
import numpy as np
from PIL import Image

def relu(x):
    # implement the relu function.
    return np.maximum(x, 0)
    
def max_pooling_with_mask(x, pool_size=2, stride=2):
    """
    Forward pass for max pooling on a 3D input (H, W, C).
    Returns both the pooled output and a mask indicating which locations were chosen.
    """
    H, W, C = x.shape
    outH = (H - pool_size) // stride + 1
    outW = (W - pool_size) // stride + 1
    pooled = np.zeros((outH, outW, C))
    mask = np.zeros_like(x)
    for c in range(C):
        for i in range(outH):
            for j in range(outW):
                h_start = i * stride
                h_end = h_start + pool_size
                w_start = j * stride
                w_end = w_start + pool_size
                window = x[h_start:h_end, w_start:w_end, c]
                # finish the pooling layer logic by completing the following two lines of code:
                max_val = np.max(window)
                pooled[i, j, c] = max_val
                found = False
                for m in range(pool_size):
                    for n in range(pool_size):
                        if window[m, n] == max_val and not found:
                            mask[h_start + m, w_start + n, c] = 1
                            found = True
    return pooled, mask

def conv2d_multi_channel_forward(input_volume, kernels):
    """
    Forward pass for multi-channel convolution.
    input_volume: shape (H, W, C_in)
    kernels: shape (kH, kW, C_in, num_filters)
    Returns output of shape (H - kH + 1, W - kW + 1, num_filters)
    """
    H, W, C = input_volume.shape
    kH, kW, _, num_filters = kernels.shape
    outH = H - kH + 1
    outW = W - kW + 1
    output = np.zeros((outH, outW, num_filters))
    for f in range(num_filters):
        for i in range(outH):
            for j in range(outW):
                conv_sum = 0
                for c in range(C):
                    for m in range(kH):
                        for n in range(kW):
                            # finish the multi-channel convolution logic by completing the following line of code:
                            conv_sum += input_volume[i + m, j + n, c] * kernels[m, n, c, f]
                output[i, j, f] = conv_sum
    return output

def relu_backward(x, d_out):
    d_x = d_out.copy()
    d_x[x <= 0] = 0
    return d_x

def max_pooling_backward(mask, d_out, pool_size=2, stride=2):
    """
    Backward pass for max pooling.
    The gradient is distributed only to the location which had the maximum value.
    """
    H, W, C = mask.shape
    outH, outW, _ = d_out.shape
    d_x = np.zeros_like(mask)
    for c in range(C):
        for i in range(outH):
            for j in range(outW):
                h_start = i * stride
                h_end = h_start + pool_size
                w_start = j * stride
                w_end = w_start + pool_size
                d_x[h_start:h_end, w_start:w_end, c] += mask[h_start:h_end, w_start:w_end, c] * d_out[i, j, c]
    return d_x

def conv2d_multi_channel_backward(input_volume, kernels, d_out):
    """
    Backward pass for multi-channel convolution.
    Returns gradients with respect to the input_volume and the kernels.
    """
    H, W, C = input_volume.shape
    kH, kW, _, num_filters = kernels.shape
    outH, outW, _ = d_out.shape
    d_input = np.zeros_like(input_volume)
    d_kernels = np.zeros_like(kernels)
    for f in range(num_filters):
        for i in range(outH):
            for j in range(outW):
                for c in range(C):
                    for m in range(kH):
                        for n in range(kW):
                            d_kernels[m, n, c, f] += input_volume[i + m, j + n, c] * d_out[i, j, f]
                            d_input[i + m, j + n, c] += kernels[m, n, c, f] * d_out[i, j, f]
    return d_input, d_kernels

# ---------------------- SimpleCNN Class ----------------------
class SimpleCNN:
    def __init__(self):
        # For two-class classification (ferrari vs. jeep), we output 2 classes.
        # Layer 1: Convolution with 4 filters (3x3) for a single-channel input.
        self.conv1_kernels = np.random.randn(3, 3, 1, 4) * 0.1
        # Layer 2: Convolution with 8 filters (3x3) spanning 4 input channels.
        self.conv2_kernels = np.random.randn(3, 3, 4, 8) * 0.1
        # After two conv layers and pooling:
        # (50,50) -> Conv1: (48,48,4) -> Pool1: (24,24,4)
        # -> Conv2: (22,22,8) -> Pool2: (11,11,8) => 11*11*8 features.
        self.fc_input_dim = 11 * 11 * 8
        self.fc_weights = np.random.randn(self.fc_input_dim, 2) * 0.1  # 2 classes: ferrari and jeep.
        self.fc_bias = np.random.randn(2) * 0.1

        self.lr = 0.005  # Learning rate.
        self.cache = {}  # Cache intermediate forward-pass results for backprop.

    def forward(self, x):
        """
        Forward pass through the network.
        x: 2D numpy array (50x50)
        Returns: logits (2-dimensional vector) for the 2 classes.
        """
        # Reshape input to (50,50,1)
        x = x.reshape(50, 50, 1)
        self.cache['x'] = x

        # finish the forward function logic by completing the expressions for conv1, relu1, (pool1, mask1), conv2, relu2, (pool2, mask2)
        # Layer 1: Convolution
        conv1 = conv2d_multi_channel_forward(x, self.conv1_kernels)  # Shape: (48,48,4)
        self.cache['conv1'] = conv1

        # ReLU activation
        relu1 = relu(conv1)
        self.cache['relu1'] = relu1

        # Max Pooling 1
        pool1, mask1 = max_pooling_with_mask(relu1, pool_size=2, stride=2)  # Shape: (24,24,4)
        self.cache['pool1'] = pool1
        self.cache['mask1'] = mask1

        # Layer 2: Convolution
        conv2 = conv2d_multi_channel_forward(pool1, self.conv2_kernels)  # Shape: (22,22,8)
        self.cache['conv2'] = conv2

        # ReLU activation
        relu2 = relu(conv2)
        self.cache['relu2'] = relu2

        # Max Pooling 2
        pool2, mask2 = max_pooling_with_mask(relu2, pool_size=2, stride=2)  # Shape: (11,11,8)
        self.cache['pool2'] = pool2
        self.cache['mask2'] = mask2

        # Flatten
        flattened = pool2.flatten()  # Shape: (11*11*8,)
        self.cache['flattened'] = flattened

        # Fully Connected Layer
        fc = np.dot(flattened, self.fc_weights) + self.fc_bias  # Shape: (2,)
        self.cache['fc'] = fc

        return fc

    def backward(self, d_fc):
        """
        Backward pass through the network.
        d_fc: Gradient with respect to the FC output (shape: (2,))
        Updates parameters using SGD.
        Returns gradients for debugging.
        """
        # Fully Connected Layer Backpropagation.
        flattened = self.cache['flattened']  # Shape: (11*11*8,)
        d_fc_weights = np.outer(flattened, d_fc)  # Shape: (flattened_dim,2)
        d_fc_bias = d_fc  # Shape: (2,)
        d_flattened = np.dot(self.fc_weights, d_fc)  # Shape: (flattened_dim,)

        # Unflatten to match pool2 shape: (11,11,8)
        d_pool2 = d_flattened.reshape(self.cache['pool2'].shape)

        # Max Pooling 2 Backpropagation.
        mask2 = self.cache['mask2']
        d_relu2 = max_pooling_backward(mask2, d_pool2, pool_size=2, stride=2)  # Shape: (22,22,8)

        # ReLU 2 Backpropagation.
        conv2 = self.cache['conv2']
        d_conv2 = relu_backward(conv2, d_relu2)

        # Convolution Layer 2 Backpropagation.
        pool1 = self.cache['pool1']
        d_pool1_from_conv2, d_conv2_kernels = conv2d_multi_channel_backward(pool1, self.conv2_kernels, d_conv2)

        # Max Pooling 1 Backpropagation.
        mask1 = self.cache['mask1']
        d_relu1 = max_pooling_backward(mask1, d_pool1_from_conv2, pool_size=2, stride=2)  # Shape: (48,48,4)

        # ReLU 1 Backpropagation.
        conv1 = self.cache['conv1']
        d_conv1 = relu_backward(conv1, d_relu1)

        # Convolution Layer 1 Backpropagation.
        x = self.cache['x']
        d_x, d_conv1_kernels = conv2d_multi_channel_backward(x, self.conv1_kernels, d_conv1)

        # SGD Weight Updates.
        self.fc_weights -= self.lr * d_fc_weights
        self.fc_bias    -= self.lr * d_fc_bias
        self.conv2_kernels -= self.lr * d_conv2_kernels
        self.conv1_kernels -= self.lr * d_conv1_kernels

        grads = {
            'd_fc_weights': d_fc_weights,
            'd_fc_bias': d_fc_bias,
            'd_conv2_kernels': d_conv2_kernels,
            'd_conv1_kernels': d_conv1_kernels,
        }
        return grads

def mse_loss(output, target):
    """
    Mean Squared Error loss.
    Returns loss and gradient with respect to output.
    """
    loss = 0.5 * np.sum((output - target) ** 2)
    d_loss = output - target
    return loss, d_loss

def test(cnn, x, y):

  # Final evaluation on the test set.
  test_loss = 0
  correct = 0
  for i in range(len(x)):
      x1 = x[i]
      target = y[i]
      output = cnn.forward(x1)
      loss, _ = mse_loss(output, target)
      test_loss += loss
      pred = np.argmax(output)
      true = np.argmax(target)
      if pred == true:
          correct += 1
  avg_loss = test_loss / len(x)
  accuracy = correct / len(x)
  print(f"{avg_loss:.4f}, {accuracy:.4f}")

In [6]:
"""
This sell contains the code for image loading and preprocessing.

You can do whatever you want with the GIVEN training dataset - preprocess and augment in any way.
At the end of the preprocessing step, however, the output MUST be resized to (50,50) and converted to grayscale.

DO NOT use any additional data from the wherever, only work with the GIVEN training dataset.

"""
from PIL import ImageEnhance

def augment_image(img): # доавил
    rotation = np.random.uniform(-15, 15)
    img = img.rotate(rotation)
    
    shift_x, shift_y = np.random.randint(-5, 6), np.random.randint(-5, 6)
    img = img.transform(img.size, Image.AFFINE, (1, 0, shift_x, 0, 1, shift_y))
    
    factor = np.random.uniform(0.8, 1.2)
    img = ImageEnhance.Brightness(img).enhance(factor)
    img = ImageEnhance.Contrast(img).enhance(factor)
    
    return img

def preprocess_pipeline(path, image_size=(50, 50), augment=False):
    """
    Open an image from the given path and process it through a pipeline.
    
    Steps:
      - Convert to grayscale.
      - Resize to image_size.
      - (Optional) Do whatever you want to preprocess the image
      - Convert to a NumPy array and normalize pixel values to [0,1].
    
    Parameters:
      path (str): Path to the image file.
      image_size (tuple): Target size for resizing (width, height).
    
    Returns:
      np.ndarray: Processed image as a NumPy array.
      - it is required that the output size is (50,50) and the images are converted to grayscale.
    """
    try:
        img = Image.open(path)
        # Convert to grayscale.
        img = img.convert("L")
        
        # Resize image.
        img = img.resize(image_size)
        
        # Convert to NumPy array and normalize.
        if augment:
            # копия ауг
            augmented_img = img.copy()
            augmented_img = augment_image(augmented_img)
            img_array = np.array(augmented_img).astype(np.float32) / 255.0
        else:
            img_array = np.array(img).astype(np.float32) / 255.0
        return img_array
    except Exception as e:
        print(f"Error processing image {path}: {e}")
        return None

def load_images_from_folder(folder, label, image_size=(50, 50), augment=False, augment_factor=1):
    """
    Load images from a folder using the preprocessing pipeline.
    
    
    Parameters:
      folder (str): Folder path.
      label (list): One-hot vector label (e.g., [1,0] or [0,1]).
      image_size (tuple): Target size for resizing.
    
    Returns:
      tuple: Two lists containing the processed images and their labels.
    """
    images = []
    labels = []
    for filename in os.listdir(folder):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
            path = os.path.join(folder, filename)
            img_array = preprocess_pipeline(path, image_size=image_size)
            if img_array is not None:
                images.append(img_array)
                labels.append(label)
                if augment:
                    for _ in range(augment_factor):
                        augmented_img_array = preprocess_pipeline(path, image_size=image_size, augment=True)
                        if augmented_img_array is not None:
                            images.append(augmented_img_array)
                            labels.append(label)
    return images, labels

def add_noise(img_array, noise_factor=0.1): # дрбавил
    noise = np.random.normal(loc=0.0, scale=noise_factor, size=img_array.shape)
    noisy_array = img_array + noise
    return np.clip(noisy_array, 0., 1.)

# ---------------------- Dataset Loading ----------------------
# Replace these with your actual folder paths.
train_ferrari_folder = "./ferrari"  # e.g., "/home/user/datasets/train_ferrari"
train_jeep_folder    = "./jeep"     # e.g., "/home/user/datasets/train_jeep"
test_ferrari_folder  = "./ferrari_test"      # e.g., "/home/user/datasets/test_ferrari"
test_jeep_folder     = "./jeep_test"         # e.g., "/home/user/datasets/test_jeep"

images_ferrari_train, labels_ferrari_train = load_images_from_folder(train_ferrari_folder, [1, 0], image_size=(50, 50), augment=True, augment_factor=2)
images_jeep_train,    labels_jeep_train    = load_images_from_folder(train_jeep_folder,    [0, 1], image_size=(50, 50), augment=True, augment_factor=2)

# Combine training images.
images = np.array(images_ferrari_train + images_jeep_train)
labels = np.array(labels_ferrari_train + labels_jeep_train)

# Shuffle the training dataset.
indices = np.arange(len(images))
np.random.shuffle(indices)
images = images[indices]
labels = labels[indices]

# For this example, we use all images as training samples.
x_train = images
y_train = labels

x_train = np.array([add_noise(img) for img in x_train]) # добавмл

# Now load the test images.
images_ferrari_test, labels_ferrari_test = load_images_from_folder(test_ferrari_folder, [1, 0], image_size=(50, 50))
images_jeep_test,    labels_jeep_test    = load_images_from_folder(test_jeep_folder,    [0, 1], image_size=(50, 50))

# Combine the test images.
x_test = np.array(images_ferrari_test + images_jeep_test)
y_test = np.array(labels_ferrari_test + labels_jeep_test)

# Shuffle the test dataset.
indices = np.arange(len(x_test))
np.random.shuffle(indices)
x_test = x_test[indices]
y_test = y_test[indices]

print(f"Training samples: {len(x_train)}, Test samples: {len(x_test)}")

Training samples: 456, Test samples: 20


In [7]:
"""
This cell contains the code for training the CNN.

DO NOT CHANGE ANYTHING IN THIS CELL, you are only allowed to run it and see the results.

"""

np.random.seed(42)
cnn = SimpleCNN()

num_train = len(x_train)
num_test = len(x_test)
print(f"Starting training on {num_train} samples.")

epochs = 10
for epoch in range(epochs):
    print(f"Current Train Loss: , Current Train Accuracy:")
    test(cnn,x_train,y_train)
    print(f"Current Test Loss: , Current Test Accuracy:")
    test(cnn,x_test,y_test)
    total_loss = 0
    # Shuffle training data at the beginning of each epoch.
    indices = np.arange(num_train)
    np.random.shuffle(indices)
    x_train = x_train[indices]
    y_train = y_train[indices]
    for i in range(num_train):
        x = x_train[i]
        target = y_train[i]
        # Forward pass.
        output = cnn.forward(x)
        loss, d_loss = mse_loss(output, target)
        total_loss += loss
        # Backward pass and update weights.
        cnn.backward(d_loss)
    avg_loss = total_loss / num_train
    print(f"Epoch {epoch+1}/{epochs}, Average Training Loss: {avg_loss:.4f}")

Starting training on 456 samples.
Current Train Loss: , Current Train Accuracy:
0.4257, 0.5482
Current Test Loss: , Current Test Accuracy:
0.4367, 0.5500
Epoch 1/10, Average Training Loss: 0.2620
Current Train Loss: , Current Train Accuracy:
0.2385, 0.6075
Current Test Loss: , Current Test Accuracy:
0.2612, 0.5500
Epoch 2/10, Average Training Loss: 0.2365
Current Train Loss: , Current Train Accuracy:
0.2190, 0.6908
Current Test Loss: , Current Test Accuracy:
0.2460, 0.6000
Epoch 3/10, Average Training Loss: 0.2169
Current Train Loss: , Current Train Accuracy:
0.2056, 0.6820
Current Test Loss: , Current Test Accuracy:
0.2193, 0.6500
Epoch 4/10, Average Training Loss: 0.2005
Current Train Loss: , Current Train Accuracy:
0.1859, 0.7368
Current Test Loss: , Current Test Accuracy:
0.2085, 0.7500
Epoch 5/10, Average Training Loss: 0.1869
Current Train Loss: , Current Train Accuracy:
0.1696, 0.7390
Current Test Loss: , Current Test Accuracy:
0.1792, 0.7000
Epoch 6/10, Average Training Loss: 0

In [8]:
def save_model (model, filename):
    np.savez(filename,
            conv1_kernels = model.conv1_kernels,
            conv2_kernels = model.conv2_kernels,
            fc_weights = model.fc_weights,
            fc_bias = model.fc_bias)

In [9]:
save_model(cnn, "my_cnn_model.npz")