# Importing

In [1]:
import os  # For interacting with the file system
import zipfile  # For working with ZIP archives
import shutil  # For high-level file operations

import torch  # For deep learning
import torchvision  # For computer vision tasks
import torch.nn as nn  # For neural network components

import numpy as np  # For numerical operations
import pandas as pd  # For data manipulation and analysis
from tqdm import tqdm  # For creating progress bars

import torch.nn.functional as F  # For PyTorch's functional interface
# from torchvision.datasets.utils import download_url  # For downloading datasets
from torchvision.models import vgg19  # For the VGG19 model
from torchvision.datasets import ImageFolder  # For the ImageFolder dataset class
from torch.utils.data import DataLoader  # For loading data in PyTorch
import torchvision.transforms as T  # For data transformations
from torch.utils.data import random_split  # For randomly splitting datasets
from torchvision.utils import make_grid  # For creating grid images

import PIL  # For image processing
from PIL import Image  # For working with images
import random  # For generating random numbers
import matplotlib # For Plotting
import matplotlib.pyplot as plt  # For plotting
import matplotlib.image as mpimg  # For working with images
%matplotlib inline  # Enable inline plotting in Jupyter Notebooks

# Set the background color of the figures to white
matplotlib.rcParams['figure.facecolor'] = '#ffffff'


UsageError: unrecognized arguments: # Enable inline plotting in Jupyter Notebooks


In [2]:
!pip install torchinfo
from torchinfo import summary



# Data Download

## Style data download

Downloading the [Painters by numbers](https://www.kaggle.com/competitions/painter-by-numbers/data) train dataset from WIKIArt using the Kaggle API.
(Ignore the error as there was a mistake in the file path )

In [3]:
# Check the details of the 'kaggle.json' file
!ls -lha /home/ec2-user/SageMaker/kaggle.json

# Install the 'kaggle' package
!pip install -q kaggle

# Create the directory '~/.kaggle' if it doesn't exist
!mkdir -p ~/.kaggle

# Copy the 'kaggle.json' file to the '~/.kaggle' directory
!cp kaggle.json ~/.kaggle/

# Set the appropriate permissions for the 'kaggle.json' file
!chmod 600 /home/ec2-user/SageMaker/kaggle.json

# Download the 'train.zip' file from the 'painter-by-numbers' competition
!kaggle competitions download -f train.zip -p '/home/ec2-user/SageMaker' -o painter-by-numbers

# Set the path to the downloaded ZIP file
local_zip = '/home/ec2-user/SageMaker/train.zip'

# Open the ZIP file
zip_ref = zipfile.ZipFile(local_zip, 'r')

# Create the directory '/home/ec2-user/SageMaker/style-data' if it doesn't exist
!mkdir /home/ec2-user/SageMaker/style-data

# Extract the contents of the ZIP file to the '/home/ec2-user/SageMaker/style-data' directory
zip_ref.extractall('/home/ec2-user/SageMaker/style-data')

# Close the ZIP file
zip_ref.close()

# Remove the downloaded ZIP file
os.remove(local_zip)

# Count the number of images in the '/home/ec2-user/SageMaker/train' directory and print the count
print('The number of images present in the WikiArt dataset are:', len(os.listdir('/home/ec2-user/SageMaker/train')))

'ls' is not recognized as an internal or external command,
operable program or batch file.
The syntax of the command is incorrect.
'cp' is not recognized as an internal or external command,
operable program or batch file.
'chmod' is not recognized as an internal or external command,
operable program or batch file.
Traceback (most recent call last):
  File "C:\Users\yashs\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\yashs\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "C:\Users\yashs\AppData\Local\Programs\Python\Python310\Scripts\kaggle.exe\__main__.py", line 4, in <module>
  File "C:\Users\yashs\AppData\Local\Programs\Python\Python310\lib\site-packages\kaggle\__init__.py", line 7, in <module>
    api.authenticate()
  File "C:\Users\yashs\AppData\Local\Programs\Python\Python310\lib\site-packages\kaggle\api\kaggle_api

FileNotFoundError: [Errno 2] No such file or directory: '/home/ec2-user/SageMaker/train.zip'

In [None]:
print('The number of images present in WikiArt dataset are:',len(os.listdir('/home/ec2-user/SageMaker/style-data/train')))

## Content data download

Downloading the COCO 2014 dataset from [COCOdataset.org](https://cocodataset.org/#download).

In [None]:
# Download the COCO dataset ZIP file
!wget --no-check-certificate \
    "http://images.cocodataset.org/zips/train2014.zip" \
    -O "/home/ec2-user/SageMaker/coco.zip"

# Set the path to the downloaded ZIP file
local_zip = '/home/ec2-user/SageMaker/coco.zip'

# Open the ZIP file
zip_ref = zipfile.ZipFile(local_zip, 'r')

# Create the directory '/home/ec2-user/SageMaker/content-data' if it doesn't exist
!mkdir /home/ec2-user/SageMaker/content-data

# Extract the contents of the ZIP file to the '/home/ec2-user/SageMaker/content-data' directory
zip_ref.extractall('/home/ec2-user/SageMaker/content-data')

# Close the ZIP file
zip_ref.close()

# Remove the downloaded ZIP file
os.remove(local_zip)

# Count the number of images in the '/home/ec2-user/SageMaker/content-data/train2014' directory and print the count
print('The number of images present in the COCO dataset are:', len(os.listdir('/home/ec2-user/SageMaker/content-data/train2014')))


## Downloading additional data for the Style Dataset.

In order to address the discrepancy in the number of images between the COCO and WIKIArt datasets, I have downloaded additional images from the test file of the WIKIArt dataset and moved them to the training dataset. This approach was taken to increase the size of the training dataset and improve the overall performance of the model.

By doing so, the model will be exposed to a more diverse set of images, which should help it better generalize to new, unseen data. Additionally, the increased number of images will allow the model to learn more robust features, which should also lead to better performance.

It is important to note that this process was undertaken with careful consideration. The additional images were selected based on their relevance to the task at hand and were thoroughly vetted to ensure that they were appropriate for use in the training dataset.

Overall, this approach represents a promising strategy for improving the performance of the model and enhancing its ability to accurately classify and identify images from the COCO and WIKIArt datasets.

In [None]:
# Set the path to the ZIP file
local_zip = '/home/ec2-user/SageMaker/IMG.zip'

# Open the ZIP file
zip_ref = zipfile.ZipFile(local_zip, 'r')

# Extract the contents of the ZIP file to the '/home/ec2-user/SageMaker/content-data-extra' directory
zip_ref.extractall('/home/ec2-user/SageMaker/content-data-extra')

# Close the ZIP file
zip_ref.close()

# Count the number of images in the '/home/ec2-user/SageMaker/content-data-extra/IMG' directory and print the count
print('The number of images present in the COCO dataset are:', len(os.listdir('/home/ec2-user/SageMaker/content-data-extra/IMG')))


I deleted 5 images randomly manually from the folder.

In [None]:
print('The number of images present in COCO dataset are:',len(os.listdir('/home/ec2-user/SageMaker/content-data-extra/IMG')))

In [None]:
source = '/home/ec2-user/SageMaker/content-data-extra/IMG'
src_folder = '/home/ec2-user/SageMaker/content-data-extra/IMG'
dst_folder = '/home/ec2-user/SageMaker/style-data/train'

# gather all files
allfiles = os.listdir(source)

# iterate on all files to move them to destination folder
for i in allfiles:
    # Construct the full path to the source file
    src_path = os.path.join(src_folder, i)

    # Construct the full path to the destination file
    dst_path = os.path.join(dst_folder, i+'-moved')
    shutil.move(src_path, dst_path)

As we can see the no. of images in both the training set are equal now.

In [None]:
print('The number of images present in WIKIART dataset are:',len(os.listdir('/home/ec2-user/SageMaker/style-data/train')))

In [None]:
print('The number of images present in COCO dataset are:',len(os.listdir('/home/ec2-user/SageMaker/content-data/train2014')))

In [None]:
# Creating new folders for storing testing dataset.
!mkdir /home/ec2-user/SageMaker/test-content
!mkdir /home/ec2-user/SageMaker/test-style
!mkdir /home/ec2-user/SageMaker/test-content/test
!mkdir /home/ec2-user/SageMaker/test-style/test

## Visualizing the dataset

Every time the 'show_images' function is run, it plots a grid of 20 different images randomly selected from the provided dataset. This allows for a diverse set of images to be displayed with each run, making it useful for exploring the dataset and identifying any potential issues or anomalies.

In [None]:
def show_images(dataset):
    """
    Display a grid of images from the given dataset.

    Args:
        dataset (str): Path to the directory containing the images.

    Returns:
        None
    """
    # Parameters for our graph; we'll output images in a 5x4 configuration
    nrows = 5
    ncols = 4

    fig = plt.gcf()  # Get the current figure
    fig.set_size_inches(ncols * 5, nrows * 5)  # Set the figure size

    for i in range(20):
        # Set up subplot; subplot indices start at 1
        img = mpimg.imread(os.path.join(dataset, random.choice(os.listdir(dataset))))
        sp = plt.subplot(nrows, ncols, i + 1)  # Create a subplot
        sp.axis('Off')  # Don't show axes (or gridlines)
        plt.imshow(img)  # Display the image

    # No need to return anything as the function only displays the images


In [None]:
show_images('/home/ec2-user/SageMaker/content-data/train2014')

In [None]:
show_images('/home/ec2-user/SageMaker/style-data/train')

# Data Transformation and Normalization Pipeline for Image Processing

In [None]:
# Define the statistics used for normalization
stats = ((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))

# Define the normalization transform
normalize = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

# Define the batch size
batch_size = 8

# Define the data transformations for training
tfms = T.Compose([
    T.Resize((512, 512)),  # Resize the image to (512, 512)
    T.RandomCrop((256, 256)),  # Randomly crop the image to (256, 256)
    T.ToTensor(),  # Convert the image to a tensor
    T.Normalize(*stats, inplace=True)  # Normalize the image with the defined statistics
])

# Define the data transformations for testing
test_tfms = T.Compose([
    T.Resize((512, 512)),  # Resize the image to (512, 512)
    T.ToTensor(),  # Convert the image to a tensor
    T.Normalize(*stats, inplace=True)  # Normalize the image with the defined statistics
])


## Data Loading and Processing Pipeline

In [None]:
content_dataset = ImageFolder('/home/ec2-user/SageMaker/content-data', tfms)    # Load the content dataset from the specified directory using the given transformations
style_dataset = ImageFolder('/home/ec2-user/SageMaker/style-data', tfms)        # Load the style dataset from the specified directory using the given transformations

test_content_dataset = ImageFolder('/home/ec2-user/SageMaker/test-content', test_tfms)       # Load the test content dataset from the specified directory using the given transformations
test_style_dataset = ImageFolder('/home/ec2-user/SageMaker/test-style', test_tfms)           # Load the test style dataset from the specified directory using the given transformations

content_dl = DataLoader(content_dataset, batch_size=batch_size, shuffle=True, num_workers=2,drop_last=True)      # Create a data loader for the content dataset with the specified batch size, shuffling, and number of workers
style_dl = DataLoader(style_dataset, batch_size=batch_size, shuffle=True, num_workers=2,drop_last=True)          # Create a data loader for the style dataset with the specified batch size, shuffling, and number of workers

test_content_dl = DataLoader(test_content_dataset, batch_size=1, num_workers=2)  # Create a data loader for the test content dataset with a batch size of 1 and 2 workers
test_style_dl = DataLoader(test_style_dataset, batch_size=1, num_workers=2)      # Create a data loader for the test style dataset with a batch size of 1 and 2 workers

The denormalize function and show_batch function are designed to visualize the images in a batch after being processed by the normalization pipeline defined earlier.

The denormalize function takes as input a batch of normalized images, along with the means and standard deviations used for normalization, and returns the corresponding denormalized images. This is useful for visualizing the images in the pipline after processing.

The show_batch function displays a batch of images using matplotlib and the make_grid function from the torchvision.utils module. It uses the denormalize function to convert the normalized images back to their original scale before displaying them.

The function creates a new figure with the specified size and removes the axis ticks to produce a cleaner visual display. The break statement is used to ensure that only one batch of data is displayed, making it easier to review the output.

In [None]:
def denormalize(images, means, stds):
    """
    Denormalize the images using the provided mean and standard deviation values.

    Args:
        images (Tensor): Input tensor of images.
        means (tuple): Mean values used for normalization.
        stds (tuple): Standard deviation values used for normalization.

    Returns:
        Tensor: Denormalized images.
    """
    means = torch.tensor(means).reshape(1, 3, 1, 1)  # Reshape means to match the shape of the input images
    stds = torch.tensor(stds).reshape(1, 3, 1, 1)  # Reshape stds to match the shape of the input images
    return images * stds + means  # Denormalize the images using element-wise multiplication and addition

def show_batch(dl):
    """
    Display a batch of images from the given data loader.

    Args:
        dl (DataLoader): Data loader containing the batch of images and labels.

    Returns:
        None
    """
    # Iterate over the data loader to get the batch of images and labels
    for images, labels in dl:
        # Create a new figure and axes with a large size for displaying the images
        fig, ax = plt.subplots(figsize=(30, 30))

        # Remove the ticks on the x and y axes
        ax.set_xticks([]); ax.set_yticks([])

        # Denormalize the images using the provided stats
        denorm_images = denormalize(images, *stats)

        # Create a grid of images and display it using the axes
        # Permute the dimensions of the tensor to match the expected format (H, W, C)
        # Clamp the values between 0 and 1 to ensure valid image display
        ax.imshow(make_grid(denorm_images, nrow=8).permute(1, 2, 0).clamp(0, 1))

        # Exit the loop after processing the first batch
        break

        # No need to return anything as the function only displays the images

In [None]:
show_batch(content_dl)

In [None]:
show_batch(style_dl)

In [None]:
show_batch(test_content_dl)

In [None]:
show_batch(test_style_dl)

###Device and Data Loader Utility Functions for Model Training

In [None]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')

def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device

    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl:
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [None]:
device = get_default_device()
device

In [None]:
content_dl = DeviceDataLoader(content_dl, device)
style_dl = DeviceDataLoader(style_dl, device)

test_content_dl = DeviceDataLoader(test_content_dl, device)
test_style_dl = DeviceDataLoader(test_style_dl, device)

# Model

In [None]:
vg19 = vgg19(True)
print(vg19)

## Encoder

In [None]:
class VGGEncoder(nn.Module):
    def __init__(self):
        super().__init__()

        # Load the VGG19 model with default weights
        vgg = vgg19(weights='DEFAULT').features

        # Define different slices of the VGG model for feature extraction
        self.slice1 = vgg[:2]
        self.slice2 = vgg[2:7]
        self.slice3 = vgg[7:12]
        self.slice4 = vgg[12:21]

        # Set requires_grad=False for all parameters to freeze the pre-trained weights
        for p in self.parameters():
            p.requires_grad = False

    def forward(self, images, output_last_feature=False):
        """
        Forward pass of the VGGEncoder.

        Args:
            images (Tensor): Input images to be encoded.
            output_last_feature (bool): If True, only the last feature is returned. Otherwise, all intermediate features are returned.

        Returns:
            Tensor or Tuple[Tensor]: Encoded features from the VGG encoder. If output_last_feature is True, returns the last feature tensor. Otherwise, returns a tuple of feature tensors from each slice.
        """
        # Pass the input images through each slice of the VGG encoder
        h1 = self.slice1(images)
        h2 = self.slice2(h1)
        h3 = self.slice3(h2)
        h4 = self.slice4(h3)

        if output_last_feature:
            # Return the last feature tensor
            return h4
        else:
            # Return a tuple of feature tensors from each slice
            return h1, h2, h3, h4


In [None]:
enc = VGGEncoder()
enc

In [None]:
summary(enc, input_size=(batch_size, 3, 512, 512))

## ADAIN

In [None]:
def calc_mean_std(features):
    """
    Calculate the mean and standard deviation of the input features.

    Args:
        features (Tensor): Input features of shape [batch_size, c, h, w].

    Returns:
        features_mean (Tensor): Mean of the features of shape [batch_size, c, 1, 1].
        features_std (Tensor): Standard deviation of the features of shape [batch_size, c, 1, 1].
    """

    # Get the batch size and number of channels from the input features
    batch_size, c = features.size()[:2]

    # Calculate the mean and reshape it to match the required shape
    features_mean = features.reshape(batch_size, c, -1).mean(dim=2).reshape(batch_size, c, 1, 1)

    # Calculate the standard deviation and reshape it to match the required shape
    features_std = features.reshape(batch_size, c, -1).std(dim=2).reshape(batch_size, c, 1, 1) + 1e-6

    return features_mean, features_std


def adain(content_features, style_features):
    """
    Apply Adaptive Instance Normalization (AdaIN) to the content features using style features.

    Args:
        content_features (Tensor): Content features of shape [batch_size, c, h, w].
        style_features (Tensor): Style features of shape [batch_size, c, h, w].

    Returns:
        normalized_features (Tensor): Normalized features of shape [batch_size, c, h, w].
    """

    # Calculate the mean and standard deviation of the content and style features
    content_mean, content_std = calc_mean_std(content_features)
    style_mean, style_std = calc_mean_std(style_features)

    # Normalize the content features using the style features
    normalized_features = style_std * (content_features - content_mean) / content_std + style_mean    # Adaptive Instance Normalization

    return normalized_features


## Decoder

In [None]:
class RC(torch.nn.Module):
    """
    A wrapper of ReflectionPad2d and Conv2d

    This class represents a combination of reflection padding and a convolutional layer.
    It applies reflection padding to the input and then performs convolution on the padded input.
    Optionally, it applies ReLU activation to the output of the convolution.

    Args:
        in_channels (int): Number of input channels.
        out_channels (int): Number of output channels.
        kernel_size (int): Size of the convolution kernel. Default is 3.
        pad_size (int): Size of the reflection padding. Default is 1.
        activated (bool): Whether to apply activation (ReLU) after convolution. Default is True.
    """
    def __init__(self, in_channels, out_channels, kernel_size=3, pad_size=1, activated=True):
        super().__init__()
        self.pad = nn.ReflectionPad2d((pad_size, pad_size, pad_size, pad_size))
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size)
        self.activated = activated

    def forward(self, x):
        """
        Forward pass of the RC module.

        Args:
            x: Input tensor of shape (batch_size, in_channels, height, width).

        Returns:
            Output tensor  after applying reflection padding, convolution,
            and activation (if enabled) of shape (batch_size, out_channels, height, width)
        """
        h = self.pad(x)     # Apply reflection padding to the input tensor
        h = self.conv(h)    # Perform convolution on the padded input
        if self.activated:  # Apply ReLU activation if activated is True
            return F.relu(h)
        else:
            return h         # Otherwise, return the output without activation


class Decoder(nn.Module):
    """
    Decoder network for image reconstruction.
    This network takes features extracted by an encoder network with
    adaptive instance normalization applied using style features and generates a reconstructed image.
    This module consists of a series of RC (ReflectionPad2d and Conv2d) layers for upsampling and Image reconstruction.
    """
    def __init__(self):
        super().__init__()
        self.rc1 = RC(512, 256, 3, 1)
        self.rc2 = RC(256, 256, 3, 1)
        self.rc3 = RC(256, 256, 3, 1)
        self.rc4 = RC(256, 256, 3, 1)
        self.rc5 = RC(256, 128, 3, 1)
        self.rc6 = RC(128, 128, 3, 1)
        self.rc7 = RC(128, 64, 3, 1)
        self.rc8 = RC(64, 64, 3, 1)
        self.rc9 = RC(64, 3, 3, 1, False)

    def forward(self, features):
       """
        Forward pass of the Decoder module.

        Args:
            features (torch.Tensor): Input features from the encoder module.

        Returns:
            torch.Tensor: Output tensor representing the reconstructed image.
        """
        # Forward pass of the Decoder module for image upsampling and reconstruction
        h = self.rc1(features)
        h = F.interpolate(h, scale_factor=2)      # Perform upsampling using F.interpolate with scale factor 2
        h = self.rc2(h)
        h = self.rc3(h)
        h = self.rc4(h)
        h = self.rc5(h)
        h = F.interpolate(h, scale_factor=2)      # Perform another upsampling using F.interpolate with scale factor 2
        h = self.rc6(h)
        h = self.rc7(h)
        h = F.interpolate(h, scale_factor=2)      # Perform another upsampling using F.interpolate with scale factor 2
        h = self.rc8(h)
        h = self.rc9(h)
        return h

In [None]:
dec = Decoder()
dec

# Computing Loss and Image Generation

In [None]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.vgg_encoder = VGGEncoder()  # Initialize the VGGEncoder to extract content and style features
        self.decoder = Decoder()        # Initialize the Decoder for image reconstruction

    def generate(self, content_images, style_images, alpha=1.0):
        """
        Generate a stylized image by  feature maps of the content image
        are normalized using the mean and standard deviation of the corresponding
        style image's feature maps. combining content and style features.
        The normalized feature maps are used to reconstruct the image.

        Args:
            content_images (torch.Tensor): Input content images as tensors.
            style_images (torch.Tensor): Input style images as tensors.
            alpha (float, optional):  Control the level of stylization. Default is 1.0.

        Returns:
            torch.Tensor: Stylized output image.
        """
        content_features = self.vgg_encoder(content_images, output_last_feature=True)  # Extract content features
        style_features = self.vgg_encoder(style_images, output_last_feature=True)      # Extract style features
        t = adain(content_features, style_features)    # Apply Adaptive Instance Normalization (AdaIN) to normalize features
        t = alpha * t + (1 - alpha) * content_features  # Blend content and style features based on alpha
        out = self.decoder(t)                          # Generate the stylized output using the decoder
        return out

    @staticmethod
    def calc_content_loss(out_features, t):
        """
        Calculate the content loss between output features and target features.

        Args:
            out_features (torch.Tensor): Output features generated by the model.
            t (torch.Tensor): Target features (content features).

        Returns:
            torch.Tensor: Content loss value.
        """
        return F.mse_loss(out_features, t)  # Calculate Mean Squared Error (MSE) loss

    @staticmethod
    def calc_style_loss(content_middle_features, style_middle_features):
        """
        Calculate the style loss between content and style features.

        Args:
            content_middle_features (list of torch.Tensor): Content features at different layers.
            style_middle_features (list of torch.Tensor): Style features at different layers.

        Returns:
            torch.Tensor: Style loss value.
        """
        loss = 0
        for c, s in zip(content_middle_features, style_middle_features):
            c_mean, c_std = calc_mean_std(c)   # Calculate mean and standard deviation of content features
            s_mean, s_std = calc_mean_std(s)   # Calculate mean and standard deviation of style features
            loss += F.mse_loss(c_mean, s_mean) + F.mse_loss(c_std, s_std)  # Calculate MSE loss between means and stds
        return loss

    def forward(self, content_images, style_images, alpha=1.0, lam=10):
        """
        Forward pass of the Model.

        Args:
            content_images (torch.Tensor): Input content images as tensors.
            style_images (torch.Tensor): Input style images as tensors.
            alpha (float, optional): Style strength factor. Default is 1.0.
            lam (float, optional): Weight of the style loss. Default is 10.

        Returns:
            torch.Tensor: Total loss value.
        """
        content_features = self.vgg_encoder(content_images, output_last_feature=True)  # Extract features from the content image
        style_features = self.vgg_encoder(style_images, output_last_feature=True)      # Extract features from the style image.
        t = adain(content_features, style_features)    # Apply Adaptive Instance Normalization (AdaIN) to combine features
        t = alpha * t + (1 - alpha) * content_features  # Blend content and style features based on alpha
        out = self.decoder(t)                          # Generate the stylized output using the decoder

        output_features = self.vgg_encoder(out, output_last_feature=True)    # Extract features from the stylized output
        output_middle_features = self.vgg_encoder(out, output_last_feature=False)  # Extract middle-level features from the stylized output
        style_middle_features = self.vgg_encoder(style_images, output_last_feature=False)  # Extract style middle features from the style image

        loss_c = self.calc_content_loss(output_features, t)     # Calculate content loss
        loss_s = self.calc_style_loss(output_middle_features, style_middle_features)  # Calculate style loss from the middle-level features from the stylized output and the style image
        loss = loss_c + lam * loss_s  # Combine content and style loss with the specified lambda weight
        return loss


# Saving Samples

In [None]:
def denorm(tensor, device):
    """
    Denormalizes the image tensor using the mean and standard deviation values of ImageNet.

    Args:
        tensor (torch.Tensor): The input tensor to be denormalized. It should have shape (C, H, W).
        device (str or torch.device): The device on which the computations should be performed.

    Returns:
        torch.Tensor: The denormalized tensor with values clamped between 0 and 1.
    """
    # Define the standard deviation values for each channel (R, G, B)
    std = torch.Tensor([0.229, 0.224, 0.225]).reshape(-1, 1, 1).to(device)

    # Define the mean values for each channel (R, G, B)
    mean = torch.Tensor([0.485, 0.456, 0.406]).reshape(-1, 1, 1).to(device)

    # Perform denormalization and clamp the tensors between value of 0 and 1 by applying the following formula:
    # Denorm = (Input * STD) + Mean
    denormalized_tensor = torch.clamp(tensor * std + mean, 0, 1)

    return denormalized_tensor

In [None]:
from torchvision.transforms.functional import resize as rez

new_height, new_width = 512, 512

The above code defines a function called save_sample which is used to generate and save sample images during the training process. The function takes the current epoch and iteration as input parameters and uses them to create a unique file name for each saved image.

Inside the function, the model is used to generate output images from a pair of style and content images in the test dataset. The style, content and output images are first denormalized and all of them are resized to be of the exact same size that is (512,512) for concatenation purpose. The resulting images are then saved as a single image file using the save_image function provided by the torchvision library.

This function is useful for visualizing the progress of the model during training and can help identify any issues with the training process.

In [None]:
def save_sample(epoch, iter):
    """
    Save stylized samples from the model to disk.

    This function saves stylized samples obtained by applying the trained model
    to a batch of test style and content images. The stylized samples are saved
    in a grid format, including the original content images, style images, and
    the stylized output.

    Args:
        epoch (int): The current epoch number of the training process.
        iter (int): The current iteration number within the epoch.

    Returns:
        None: The function saves the images to disk and does not return any value.

    Example:
        for epoch in range(num_epochs):
            for i, (style, content) in enumerate(zip(test_style_dl, test_content_dl), 1):
                save_sample(epoch, i)
    """
    for (i, (style, content)) in enumerate(zip(test_style_dl, test_content_dl), 1):
        with torch.no_grad():
            # Generate stylized output using the model
            out = model.generate(content[0], style[0])

        # Denormalize the images before saving
        content = denorm(content[0], device)
        style = denorm(style[0], device)
        out = denorm(out, device)

        # Resize the images to a new height and width
        content = rez(content, size=(new_height, new_width))
        style = rez(style, size=(new_height, new_width))
        out = rez(out, size=(new_height, new_width))

        # Concatenate the images into a single grid
        res = torch.cat([content, style, out], dim=0)
        res = res.to('cpu')

        # Save the grid of stylized samples to disk
        torchvision.utils.save_image(res, f'/home/ec2-user/SageMaker/test-images/{epoch}_epoch_{iter}_iteration_{i}_content.png', nrow=batch_size)

# Learning Rate Scheduler

This function adjusts the learning rate of the optimizer based on the number of iterations completed and a specified learning rate decay. The learning rate is decreased as the number of iterations increases, which helps the optimizer converge more effectively as it approaches the optimal solution.

In [None]:
def adjust_learning_rate(optimiser, iters, learning_rate_decay):
    """
    Adjusts the learning rate of the optimizer during training based on the number of iterations.

    Args:
        optimiser (torch.optim.Optimizer): The optimizer whose learning rate needs to be adjusted.
        iters (int): The current number of iterations in the training process.
        learning_rate_decay (float): The learning rate decay factor, controlling the rate of learning rate reduction.

    Returns:
        None: The function modifies the learning rate in-place within the optimizer.

    Example:
        optimiser = torch.optim.Adam(model.parameters(), lr=0.001)
        for epoch in range(num_epochs):
            for i, (inputs, labels) in enumerate(train_loader):
                # Perform training steps here
                iters = epoch * len(train_loader) + i
                adjust_learning_rate(optimiser, iters, learning_rate_decay=0.001)
    """
    optimiser.param_groups[0]['lr'] = 0.001 / (1.0 + learning_rate_decay * iters)


# Training

The function takes arguments such as the number of epochs to train for, the optimizer to use, and the directory to save the trained model file. It also includes options for changing the number of iterations which is useful when resuming training, learning rate decay, and saving intermediate samples and model checkpoints.

The try block contains the code that may raise an exception, which in this case is caused by a truncated image file, PIL.Image.DecompressionBombError which may occur when reading and loading image files, and a StopIteration exception when there are no more style images in the dataset.

The except block catches the exception and performs an appropriate action. In the case of a truncated image file the error message will be printed, and the current batch of data will be skipped. If a StopIteration error is encountered, the style dataset iterator will be reset to the beginning, and training will continue with the next batch of content data.

The learning rate is decayed over time to improve convergence. The function prints the loss after every 500 iterations and saves the model checkpoint after every 1000 iterations. Finally, the function saves intermediate samples at the end of each epoch.

In [None]:
def fit(epochs, optimizer=torch.optim.SGD, model_state_dir='/home/ec2-user/SageMaker/trained-models', iters=1,
        epc=1, learning_rate_decay=5e-5):
    """
    Train the model for a specified number of epochs.

    This function performs the training loop for a given number of epochs and updates the model's parameters
    using the provided optimizer. The training progress is printed for each iteration, and the loss is recorded
    in a list. Additionally, the function saves the model state and stylized samples at specific intervals.

    Args:
        epochs (int): The total number of epochs to train the model.
        optimizer (torch.optim.Optimizer, optional): The optimizer to update model parameters. Default is SGD.
        model_state_dir (str, optional): Directory to save the trained model's state. Default is '/home/ec2-user/SageMaker/trained-models'.
        iters (int, optional): The initial value for the number of iterations. Default is 1.
        epc (int, optional): The initial value for the epoch number. Default is 1.
        learning_rate_decay (float, optional): The learning rate decay factor for adjusting the learning rate during training. Default is 5e-5.

    Returns:
        list: List of recorded losses during the training process.

    Example:
        # Assuming you have a defined model, content_dl, and style_dl
        optim = torch.optim.Adam(model.parameters(), lr=0.001)
        loss_list = fit(epochs=20, optimizer=optim)
    """
    # Initialize an empty list to record losses during training
    loss_list = []

    # Get the total number of iterations in the content dataloader
    iterations = len(content_dl)

    # Training loop for each epoch
    for e in range(1, epochs + 1):
        print(f'Start {e} epoch')

        # Create an iterator for the style dataloader
        style_iter = iter(style_dl)
         # start training
        # Iterate over content images in the dataloader
        for (i, content) in tqdm(enumerate(content_dl, 1)):
            try:
                # Fetch a batch of style images from the style dataloader
                style = next(style_iter)

                # Calculate the loss and backpropagate to update model parameters
                loss = model(content[0], style[0])
                loss_list.append(loss.item())
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                # Decay the Learning rate after each iteration
                adjust_learning_rate(optimizer, iters, learning_rate_decay)

            except (OSError, PIL.Image.DecompressionBombError) as e:
                print(f"Skipping batch due to truncated image file.")
                continue
            except StopIteration as e:
                # Handle the case when the style iterator reaches the end, reset it for the next epoch
                style_iter = iter(style_dl)
                continue

            # Printing the loss for every 500 iterations
            if iters % 500 == 0:
                 print(f'[{epc}/total 20 epoch],[{i} /'
                  f'total {round(iterations)} iteration]: {loss.item()}')

            # Saving the model state every 1000 iterations
            if iters % 1000 == 0:
                torch.save(model.state_dict(), f'{model_state_dir}/{epc}_epoch_{iters}_iterations.pth')

            iters = iters + 1

        # Saving the stylized samples at the end of each epoch
        save_sample(epc, iters)
        epc = epc + 1

    return loss_list

In [None]:
def modelbase(reuse, learning_rate):
    """
    Create a new model and optimizer or reuse an existing model with a specified learning rate for resuming training.

    This function is used to initialize a new instance of the model and optimizer or load an existing model's state
    using the provided file path. The optimizer is created with the given learning rate.

    Args:
        reuse (str or None): File path to the saved state of an existing model to be reused, or None to create a new model.
        learning_rate (float): The learning rate for the optimizer.

    Returns:
        tuple: A tuple containing the model and optimizer instances.

    Example:
        # Create a new model and optimizer
        model, optimizer = modelbase(reuse=None, learning_rate=0.001)

        # Reuse an existing model and specify a different learning rate
        model, optimizer = modelbase(reuse='/path/to/existing_model.pth', learning_rate=0.0005)
    """
    # Create a new instance of the model and move it to the appropriate device
    model = to_device(Model(), device)

    # If 'reuse' is provided, load the model's state from the specified file path
    if reuse is not None:
        model.load_state_dict(torch.load(reuse))

    # Create the optimizer with the specified learning rate and model parameters
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Return the model and optimizer instances as a tuple
    return model, optimizer

#Training

In [None]:
model,optimizer = modelbase(reuse=None,learning_rate= 0.001)

In [None]:
epochs = 20

Please note that the errors in the code cells presented below have been rectified.

### Training 1st part.

In [None]:
history = fit(epochs,
              optimizer)

### Training 2nd part.

In [None]:
history = fit(epochs,
              optimizer,iters = 857 )

### Training 3rd part.

In [None]:
history = fit(epochs,
              optimizer,iters = 1681)

### Training 4th part.

In [None]:
history = fit(epochs,
              optimizer,iters = 1822)

### Training 5th part.

In [None]:
history = fit(epochs,
              optimizer,iters = 2622)

### Training 6th part.

In [None]:
history = fit(epochs,
              optimizer,iters = 3251)

### Training 7th part.

In [None]:
history = fit(epochs,
              optimizer,iters=3350)

### Training 8th part.

In [None]:
history = fit(epochs,
              optimizer,iters=4490)

### Training 9th part.

In [None]:
history = fit(epochs,
              optimizer,iters=4500)

### Training 10th part.

In [None]:
history = fit(epochs,
              optimizer,iters=5001)

### Training 11th part.

In [None]:
model,optimizer = modelbase(reuse='/home/ec2-user/SageMaker/trained-models/1_epoch_15000_iterations.pth',learning_rate= 0.00068)

In [None]:
epochs =19

history = fit(epochs,
              optimizer,iters=11594,epc=2)

## Model training last part

In [None]:
model,optimizer = modelbase(reuse='/home/ec2-user/SageMaker/trained-models/11_epoch_107000_iterations.pth',
                            learning_rate= 0.0002)

epochs = 10
history = fit(epochs,
              optimizer,iters=107001,epc=11)

Saving the final trained model, creating a zip of all the trained models that are saved every 1000 iterations and cretaing a zip of all the test image files that have been saved after every epoch.  

In [None]:
torch.save(model.state_dict(), '/home/ec2-user/SageMaker/final.pth')

In [None]:
shutil.make_archive('/home/ec2-user/SageMaker/models', 'zip','/home/ec2-user/SageMaker/trained-models')

In [None]:
shutil.make_archive('/home/ec2-user/SageMaker/testimages', 'zip','/home/ec2-user/SageMaker/test-images')

# Plotting The loss

In [None]:
def loss_plot(history):
    """
    Plots the training loss over iterations.

    Parameters:
        history (list): A list containing the training loss values over iterations.

    Returns:
        None

    Example:
        loss_values = [0.5, 0.4, 0.3, 0.2, 0.1]
        loss_plot(loss_values)
    """
    # Create a line plot of training loss values over iterations
    plt.plot(range(len(history)), history)

    # Label the x-axis as 'iteration'
    plt.xlabel('iteration')

    # Label the y-axis as 'loss'
    plt.ylabel('loss')

    # Set the title of the plot as 'train loss'
    plt.title('train loss')

In [None]:
loss_plot(history)