In [1]:
# Uncomment and run the appropriate command for your operating system, if required
# No installation is reqiured on Google Colab / Kaggle notebooks

# Linux / Binder / Windows (No GPU)
# !pip install numpy matplotlib torch==1.7.0+cpu torchvision==0.8.1+cpu torchaudio==0.7.0 -f https://download.pytorch.org/whl/torch_stable.html

# Linux / Windows (GPU)
# pip install numpy matplotlib torch==1.7.1+cu110 torchvision==0.8.2+cu110 torchaudio==0.7.2 -f https://download.pytorch.org/whl/torch_stable.html
 
# MacOS (NO GPU)
# !pip install numpy matplotlib torch torchvision torchaudio

# Installing necessary packages

In [2]:
import subprocess
import sys

def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

required_packages = ['numpy', 'matplotlib', 'torch', 'torchvision', 'torchaudio', 'transformers', 'pillow', 'json']

for package in required_packages:
    try:
        __import__(package)
    except ImportError:
        install(package)



In [3]:
import os
import torch
import json
import torch.nn as nn
import re
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, utils
from PIL import Image, ImageFilter, ImageEnhance
import shutil
import matplotlib.pyplot as plt

# Data Preprocessing

In [4]:
from PIL import UnidentifiedImageError

image_dir = 'dataset/images'
files = os.listdir(image_dir)  # list of all files in the directory

counter = 1

# pattern to match files already in the `artworkXXX.ext` format
pattern = re.compile(r'artwork\d{3}\.\w+')

for file_name in files:
    file_ext = os.path.splitext(file_name)[1]
    
    old_file_path = os.path.join(image_dir, file_name)
    new_file_path = old_file_path
    
    if not pattern.match(file_name) and file_name not in ['nomadic', 'cubism', 'gray', 'print']:
        new_file_name = f'artwork{counter:03d}{file_ext}'
        new_file_path = os.path.join(image_dir, new_file_name)
        os.rename(old_file_path, new_file_path)
        counter += 1
    
    try:
        # Open the image
        with Image.open(new_file_path) as img:
            # Check if the image size is not 128x128
            if img.size != (128, 128):
                img_resized = img.resize((128, 128))
                img_resized.save(new_file_path)
    except (UnidentifiedImageError, OSError) as e:
        print(f"Skipping invalid file: {file_name}, error: {e}")

print("Renaming and resizing completed.")

Skipping invalid file: gray, error: [Errno 21] Is a directory: 'dataset/images/gray'
Skipping invalid file: nomadic, error: [Errno 21] Is a directory: 'dataset/images/nomadic'
Skipping invalid file: print, error: [Errno 21] Is a directory: 'dataset/images/print'
Skipping invalid file: cubism, error: [Errno 21] Is a directory: 'dataset/images/cubism'
Renaming and resizing completed.


In [5]:
class TextImageDataset(Dataset):
    def __init__(self, text_dir, image_dir, transform=None):
        self.text_dir = text_dir
        self.image_dir = image_dir
        self.transform = transform
        self.image_files = sorted(os.listdir(image_dir))
        self.text_files = sorted([f for f in os.listdir(text_dir) if f.endswith('.txt')])
        
        self.word_to_idx = {'nomadic': 0, 'cubism': 1, 'print': 2, 'gray': 3}
        self.manual_embeddings = {
            'nomadic': torch.tensor([1.0, 0.0, 0.0, 0.0], dtype=torch.float32),
            'cubism': torch.tensor([0.0, 1.0, 0.0, 0.0], dtype=torch.float32),
            'print': torch.tensor([0.0, 0.0, 1.0, 0.0], dtype=torch.float32),
            'gray': torch.tensor([0.0, 0.0, 0.0, 1.0], dtype=torch.float32),
        }
        
        self.valid_image_files = self.filter_valid_images()
        
        if len(self.text_files) != len(self.valid_image_files):
            raise ValueError("Number of text files does not match number of valid image files.")

    def filter_valid_images(self):
        valid_files = []
        for img_name in self.image_files:
            img_path = os.path.join(self.image_dir, img_name)
            try:
                img = Image.open(img_path)
                img.verify()
                valid_files.append(img_name)
            except (UnidentifiedImageError, IOError):
                print(f"Invalid image file: {img_path}")
        return valid_files

    def __len__(self):
        return len(self.valid_image_files)
    
    def __getitem__(self, idx):
        img_name = os.path.join(self.image_dir, self.valid_image_files[idx])
        image = Image.open(img_name)
        if image.mode != 'RGB':
            image = image.convert('RGB')
        if self.transform:
            image = self.transform(image)
        
        txt_name = os.path.join(self.text_dir, self.text_files[idx])
        description = self.read_text_file(txt_name)
        
        tokens = self.tokenize_description(description)
        return image, tokens

    def read_text_file(self, file_path):
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                return f.read()
        except UnicodeDecodeError:
            with open(file_path, 'r', encoding='latin1') as f:
                return f.read()
    
    def tokenize_description(self, description):
        words = re.split(r',\s*', description.lower())
        embeddings = [self.manual_embeddings[word] for word in words if word in self.manual_embeddings]
        return torch.stack(embeddings).sum(dim=0)

transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
])

dataset = TextImageDataset(text_dir='dataset/text', image_dir='dataset/images', transform=transform)
dataloader = DataLoader(dataset, batch_size=3, shuffle=True)

print("Dataset and DataLoader created successfully.")

Invalid image file: dataset/images/cubism
Invalid image file: dataset/images/gray
Invalid image file: dataset/images/nomadic
Invalid image file: dataset/images/print
Dataset and DataLoader created successfully.


# Feature Engineering
Defining the Generator and Discriminator networks

In [6]:
class Discriminator(nn.Module):
    def __init__(self, img_channels, text_dim, hidden_dim, img_size=128):
        super(Discriminator, self).__init__()

        def conv_output_size(size, kernel_size=4, stride=2, padding=1):
            return (size - kernel_size + 2 * padding) // stride + 1

        self.img_size = img_size
        self.hidden_dim = hidden_dim

        current_size = img_size
        self.img_dis = nn.Sequential(
            nn.Conv2d(img_channels, hidden_dim // 4, 4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True)
        )
        current_size = conv_output_size(current_size, kernel_size=4, stride=2, padding=1)

        self.img_dis.add_module("conv2", nn.Conv2d(hidden_dim // 4, hidden_dim // 2, 4, stride=2, padding=1))
        self.img_dis.add_module("lrelu2", nn.LeakyReLU(0.2, inplace=True))
        current_size = conv_output_size(current_size, kernel_size=4, stride=2, padding=1)

        self.img_dis.add_module("conv3", nn.Conv2d(hidden_dim // 2, hidden_dim, 4, stride=2, padding=1))
        self.img_dis.add_module("lrelu3", nn.LeakyReLU(0.2, inplace=True))
        current_size = conv_output_size(current_size, kernel_size=4, stride=2, padding=1)

        self.flattened_img_size = hidden_dim * (current_size * current_size)

        self.text_embedding = nn.Linear(text_dim, self.flattened_img_size)
        self.final = nn.Linear(self.flattened_img_size * 2, 1)

    def forward(self, img, text):
        img_out = self.img_dis(img).view(img.size(0), -1)
        text_embedding = self.text_embedding(text.float())
        x = torch.cat([img_out, text_embedding], dim=1)
        return self.final(x)

In [7]:
class Generator(nn.Module):
    def __init__(self, latent_dim, text_dim, img_channels):
        super(Generator, self).__init__()
        self.text_embedding = nn.Linear(text_dim, latent_dim)
        self.gen = nn.Sequential(
            nn.ConvTranspose2d(latent_dim * 2, 512, 4, 1, 0),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            nn.ConvTranspose2d(512, 256, 4, 2, 1),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            nn.ConvTranspose2d(256, 128, 4, 2, 1),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 64, 4, 2, 1),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            nn.ConvTranspose2d(64, img_channels, 4, 2, 1),
            nn.Tanh()
        )

    def forward(self, noise, text):
        text_embedding = self.text_embedding(text).sum(dim=1)
        x = torch.cat([noise, text_embedding], dim=1)
        x = x.unsqueeze(2).unsqueeze(3)
        return self.gen(x)

In [8]:
text_dim = 4  # Set the text dimension to the number of unique words
latent_dim = 100
hidden_dim = 512
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
generator = Generator(latent_dim, text_dim, 3).to(device)
discriminator = Discriminator(3, text_dim, hidden_dim, img_size=128).to(device)

# Training the Model
The generator and discriminator are trained here using a loop.
## Optimization
The optimization is embedded within the training loop. The code uses Adam optimizer to update the weights of both the generator and the discriminator.
## Evaluation
Evaluation happens within the training loop where loss values for both the generator and the discriminator are printed every 100 steps. Additionally, generated sample images are saved at the end of each epoch to visualize the generator’s progress.

In [9]:
import torch.optim as optim

epochs = 50  # number of epochs
lr = 0.0001  # learning rate

def train_gan(generator, discriminator, dataloader, epochs, lr, device):
    criterion = nn.BCEWithLogitsLoss()
    optim_g = optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
    optim_d = optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))

    for epoch in range(epochs):
        for i, (images, text_ids) in enumerate(dataloader):
            batch_size = images.size(0)
            real_labels = torch.ones(batch_size, 1).to(device)
            fake_labels = torch.zeros(batch_size, 1).to(device)

            images = images.to(device)
            text_ids = text_ids.to(device)

            optim_d.zero_grad()
            outputs = discriminator(images, text_ids)
            real_loss = criterion(outputs, real_labels)
            real_loss.backward()

            noise = torch.randn(batch_size, latent_dim).to(device)
            fake_images = generator(noise, text_ids)
            outputs = discriminator(fake_images.detach(), text_ids)
            fake_loss = criterion(outputs, fake_labels)
            fake_loss.backward()
            optim_d.step()

            optim_g.zero_grad()
            outputs = discriminator(fake_images, text_ids)
            g_loss = criterion(outputs, real_labels)
            g_loss.backward()
            optim_g.step()

            if (i + 1) % 100 == 0:
                print(f'Epoch [{epoch + 1}/{epochs}], Step [{i + 1}/{len(dataloader)}], '
                      f'D Loss: {real_loss.item() + fake_loss.item():.4f}, G Loss: {g_loss.item():.4f}')

        with torch.no_grad():
            sample_noise = torch.randn(batch_size, latent_dim).to(device)
            sample_images = generator(sample_noise, text_ids)
            os.makedirs('samples', exist_ok=True)
            utils.save_image(sample_images, f'samples/sample_epoch_{epoch + 1}.png', nrow=8, normalize=True)

    torch.save(generator.state_dict(), 'generator.pth')
    torch.save(discriminator.state_dict(), 'discriminator.pth')

train_gan(generator, discriminator, dataloader, epochs, lr, device)

RuntimeError: Tensors must have same number of dimensions: got 2 and 1