In [None]:
# custom multimodal dataloader
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from PIL import Image
import pandas as pd
import os 
import math
from sklearn.model_selection import train_test_split
import tqdm


image_dir = r'\Users\jqvil\Desktop\jessica quenneville\images\images'
csv_file = 'UTKFaceAugmented.csv'

data = pd.read_csv(csv_file)
# Placeholder to store image data and labels
images = []
labels = []


# Load and preprocess images
for index, row in data.iterrows():
    filename = row['filename']  
    age_label = row['age']  
    image_path = os.path.join(image_dir, filename)
    if os.path.exists(image_path):  # Check if the file exists before processing
        img = Image.open(image_path).convert('L')  # Convert to grayscale
        img = img.resize((64, 64))
        img_tensor = torch.tensor(np.array(img) / 255.0, dtype=torch.float32)
        images.append(img_tensor.unsqueeze(0))  # Add a batch dimension
        labels.append(age_label)
    else:
        print(f"File not found: {image_path}")


image_tensor = torch.cat(images, dim=0)
labels = torch.tensor(labels, dtype=torch.float32)


In [None]:

# Function to calculate output shape after convolution
def calculate_output_shape(image_size, kernel, stride, padding):
    assert len(image_size) == len(kernel) == len(stride) == len(padding), "All values should have the same length"
    output_shape = []
    for dim in range(len(image_size)):
        output = 1 + (image_size[dim] + (2 * padding[dim]) - kernel[dim]) / stride[dim]
        assert output.is_integer(), "Change kernel size, padding, and stride to get an integer value for the output"
        output_shape.append(int(output))
    return output_shape

# Calculate output shape after convolution
output_shape = calculate_output_shape([8, 8], [3, 3], [3, 3], [2, 2])
print("Output Shape after Convolution:", output_shape)

# Create a convolutional layer with adjusted kernel size
conv = nn.Conv2d(
    in_channels=3,  # 3 channels RGB
    out_channels=64,  # Number of kernels to train
    kernel_size=(3, 3),  # Adjusted kernel size to demonstrate convolution
    stride=(1, 1),
    padding=(2, 2)
)

# Create a random input image (8x8x1)
image_np = np.random.randn(3, 3, 8, 8).astype(np.float32)
image_tensor = torch.from_numpy(image_np)
output = conv(image_tensor)

In [None]:
# Extract indices where target equals 8
indices = (image_tensor == 1).nonzero(as_tuple=True)

# Use the extracted indices to get corresponding data and target values
x = image_tensor[indices[:20]]
y = image_tensor[indices[:20]]
print(x,y)



# prep the data for training
x = np.reshape(x, (len(x), 8, 1, 1))
x = torch.tensor(x, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)

# prep the data for training
x = np.reshape(x, (len(x), 8, 1, 1))
x = torch.tensor(x, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)

conv_1kernel = nn.Conv2d(
    in_channels=1,
    out_channels=1,
    kernel_size=(20,20),
    stride=(1, 1),
    padding=(1, 1)  
)


# lets set up a training loop, and save snapshots of each kernel every epoch
optimizer = torch.optim.Adam(conv_1kernel.parameters(), lr = 0.001)
loss_fn = torch.nn.BCEWithLogitsLoss(reduction = 'none')
# Initialize the list to store learned kernels
kernels = []

# Append the initial kernel 
kernels.append(conv_1kernel.weight.data.detach().numpy().copy())

optimizer = torch.optim.Adam(conv_1kernel.parameters(), lr = 0.001)
loss_fn = torch.nn.BCEWithLogitsLoss(reduction = 'none')


kernels.append([param.detach().numpy().copy() for param in conv_1kernel.parameters()][0])
for _ in range(10):
    for i in range(len(x)):
        optimizer.zero_grad()
        yhat = torch.squeeze(conv_1kernel(x[i:i+1]))
        loss = loss_fn(y[i], yhat)
        loss.backward()
        optimizer.step()
    kernels.append([param.detach().numpy().copy() for param in conv_1kernel.parameters()][0])


In [None]:
# we can fetch an item from the data like so, which gives us the image in PIL form, and the class label as an int
image_data_list = []
count = 0

# Loop through image files in the directory
for filename in os.listdir(image_dir):
    # Check if the file is an image (ends with .jpg)
    if filename.endswith(".jpg"):
        # Construct the full path to the image file
        image_path = os.path.join(image_dir, filename)
        
        # Open the image using PIL
        img = Image.open(image_path)
        
        # Convert the image to a NumPy array and append to the list
        img_array = np.array(img)
        image_data_list.append(img_array)
        # Increment count
        count += 1
        
        # Break the loop when 1000 images are read
        if count >= 20:
            break


# Convert the list of image arrays to a NumPy array
image_data = np.array(image_data_list, dtype=np.float32)

num_rows = 4  # 20 images in 4 rows
num_cols = 5  # 5 images per row

# Create a subplot grid to display multiple images
fig, axes = plt.subplots(num_rows, num_cols, figsize=(50, 50))

# Flatten the axes array for easy iteration
axes = axes.flatten()

# Display the first 1000 images in a grid format

image_data_normalized = image_data / 255.0 
for i in range(20):
    axes[i].imshow(image_data_normalized[i], cmap='viridis')  #  colormap viridis
    axes[i].axis('off')

plt.tight_layout()
plt.show()
plt.tight_layout()
plt.show()



In [None]:
from networks import CNNClassifier
from dataloader import CustomImageDataloader

In [None]:
x_train, x_test, y_train, y_test = train_test_split(image_data, labels[:20], test_size=0.2, random_state=42)
x_val, x_test, y_val, y_test = train_test_split(x_test, y_test, test_size=0.5, random_state=42)

# Convert data to PyTorch tensors
x_train = torch.stack([torch.tensor(np.array(img), dtype=torch.float32) for img in x_train])
x_val = torch.stack([torch.tensor(np.array(img), dtype=torch.float32) for img in x_val])
x_test = torch.stack([torch.tensor(np.array(img), dtype=torch.float32) for img in x_test])

num_classes = 32
y_train = np.array(y_train)
y_val = np.array(y_val)
y_test = np.array(y_test)

y_train = np.clip(y_train, 0, num_classes - 1)
y_val = np.clip(y_val, 0, num_classes - 1)
y_test = np.clip(y_test, 0, num_classes - 1)

# One-hot encode the labels for training, validation, and test sets
y_train = torch.tensor(y_train, dtype=torch.int64)
y_train = torch.nn.functional.one_hot(y_train, num_classes=num_classes)

y_val = torch.tensor(y_val, dtype=torch.int64)
y_val = torch.nn.functional.one_hot(y_val, num_classes=num_classes)

y_test = torch.tensor(y_test, dtype=torch.int64)
y_test = torch.nn.functional.one_hot(y_test, num_classes=num_classes)


x = []
y = []



for i in range(len(image_data[:20])):
    img = image_data[i]
    label = []
    x.append(torch.tensor(np.array(img), dtype=torch.float32))
    y.append(label)
# Validation Data
x = []
y = []
for img_tensor in x_train:
    x.append(torch.tensor(np.array(img_tensor), dtype=torch.float32))


for i in range(20):
    img = image_data[i]
    label = []
    x.append(torch.tensor(np.array(img), dtype=torch.float32))
    y.append(label)
# Stack x to create x_val tensor
x_val = torch.stack(x)
x_val = torch.unsqueeze(x_val, 1)  # Reshape x_val if needed

# One-hot encode the labels y into y_val tensor
num_classes = 10 
y_val = torch.zeros(len(y), num_classes)
y_indices = torch.tensor(y, dtype=torch.long)

# Initialize y_val with appropriate dimensions
y_val = torch.zeros(len(y), num_classes)

# Assign 1s to specific positions
x_val = torch.stack(x)
x_val = torch.unsqueeze(x_val, 1)
y_val = torch.zeros(len(y),10)

In [None]:

train_dataloader = CustomImageDataloader(x = x_train, y = y_train, batch_size=16, randomize=True)
val_dataloader = CustomImageDataloader(x = x_val, y = y_val, batch_size=16, randomize=False)

model = CNNClassifier(32)



# instantiate your optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# log your losses
train_losses = []
val_losses = []

# define how many epochs to train on
epochs = 50

# define your loss function for multiclass classification task
# BCE does binary cross entropy automatically for each class
loss_fn = torch.nn.BCELoss(reduction='none')

for _ in tqdm.tqdm(range(epochs)):
    losses = []
    for _ in range(train_dataloader.num_batches_per_epoch):
        # training data forward pass
        optimizer.zero_grad()
        train_batch = train_dataloader.fetch_batch()
        yhat = model(train_batch['x_batch'])
        train_loss = torch.mean(loss_fn(yhat.float(), train_batch['y_batch'].float()))

        # training data backward pass
        train_loss.backward()
        optimizer.step()
        losses.append(train_loss.detach().numpy())

    # personally, I like to visualize the loss per every iteration, rather than every epoch. I find it more useful to diagnose issues
    train_losses.extend(losses)
    
    losses = []


In [None]:

def predict_age(model, dataloader):
    model.eval()  # Set the model to evaluation mode
    predicted_ages = []
    
    with torch.no_grad():
        for _ in range(dataloader.num_batches_per_epoch):
            val_batch = dataloader.fetch_batch()
            yhat = model(val_batch['x_batch'])
            predicted_ages.extend(yhat.cpu().numpy()) 
    
    return predicted_ages

# Use the trained model and validation dataloader to predict ages
predicted_ages = predict_age(model, train_dataloader)

import matplotlib.pyplot as plt

# Plot a histogram of predicted ages
plt.figure(figsize=(8, 6))
plt.hist(predicted_ages, bins=30, edgecolor='black')
plt.xlabel('Predicted Ages')
plt.ylabel('Frequency')
plt.title('Histogram of Predicted Ages')
plt.grid(True)
plt.show()

In [None]:

# Plotting training loss per epoch
plt.figure(figsize=(8, 6))
plt.plot(range(1, epochs + 1), train_losses, label='Training Loss', marker='o', color='blue')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training Loss per Epoch')
plt.legend()
plt.grid(True)
plt.show()

# Plotting Training Losses
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.hist(train_losses, bins=20, color='blue', alpha=0.7)
plt.title('Training Loss Distribution')
plt.xlabel('Loss')
plt.ylabel('Frequency')
