In [None]:
!pip install torch

In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import DataLoader
from sklearn.metrics import f1_score
import numpy as np
import os
from PIL import Image
from torch.utils.data import Dataset
import matplotlib.pyplot as plt

In [None]:
class CustomDataset(Dataset):
  def __init__(self, root_dir, transform=None):
    self.root_dir = root_dir
    self.transform = transform
    self.images = []
    self.labels = []

    for class_name in os.listdir(root_dir):
      class_dir = os.path.join(root_dir, class_name)
      if os.path.isdir(class_dir):
        class_idx = len(self.labels)
        for image_name in os.listdir(class_dir):
          image_path = os.path.join(class_dir, image_name)
          self.images.append(image_path)
          self.labels.append(class_idx)

  def __len__(self):
    return len(self.images)

  def __getitem__(self, idx):
    image_path = self.images[idx]
    label = self.labels[idx]

    image = Image.open(image_path).convert('RGB')

    if self.transform:
      image = self.transform(image)

    return image, label

In [None]:
from torchvision import transforms

transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_dataset = CustomDataset(root_dir='path/to/your/dataset', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = CustomDataset(root_dir='path/to/your/dataset', transform=transform)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

test_dataset = CustomDataset(root_dir='path/to/your/dataset', transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
class VisionTransformer(nn.Module):
    def __init__(self, image_size, patch_size, num_classes, embedding_dim=128, num_heads=4, num_layers=2):
        super(VisionTransformer, self).__init__()

        # Calculate the number of patches based on the image size and patch size
        self.num_patches = (image_size // patch_size) ** 2
        self.patch_size = patch_size

        # Input embedding layer
        self.input_embedding = nn.Linear(self.patch_size * self.patch_size, embedding_dim)

        # Positional encoding
        self.positional_encoding = self.initialize_positional_encoding(embedding_dim, self.num_patches)

        # Transformer encoder blocks
        self.encoder_blocks = nn.ModuleList([
            TransformerEncoderBlock(embedding_dim, num_heads) for _ in range(num_layers)
        ])

        # Classification head
        self.classification_head = nn.Linear(embedding_dim, num_classes)

    def initialize_positional_encoding(self, embedding_dim, num_patches):
        # Create positional embeddings for patches
        positional_encoding = torch.zeros(1, num_patches, embedding_dim)
        positions = torch.arange(0, num_patches, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embedding_dim, 2).float() * (-torch.log(torch.tensor(10000.0)) / embedding_dim))
        positional_encoding[0, :, 0::2] = torch.sin(positions * div_term)
        positional_encoding[0, :, 1::2] = torch.cos(positions * div_term)
        return nn.Parameter(positional_encoding, requires_grad=False)

    def forward(self, x):
        # Reshape input images into patches
        patches = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        patches = patches.permute(0, 2, 3, 1, 4, 5).contiguous().view(x.size(0), -1, self.patch_size * self.patch_size)

        # Apply input embedding
        patches = self.input_embedding(patches)

        # Add positional encoding
        patches += self.positional_encoding

        # Transformer encoder blocks
        for encoder_block in self.encoder_blocks:
            patches = encoder_block(patches)

        # Global average pooling
        output = patches.mean(1)

        # Apply classification head
        output = self.classification_head(output.squeeze())  # Squeeze the tensor to remove the extra dimension

        return output



class MultiHeadAttentionLayer(nn.Module):
  def __init__(self, embedding_dim, num_heads, dropout=0.1):
    super(MultiHeadAttentionLayer, self).__init__()
    self.embedding_dim = embedding_dim
    self.num_heads = num_heads

    # Check if embedding dimensions can be divided evenly by the number of heads
    assert embedding_dim % num_heads == 0, "Embedding dimension must be divisible by the number of heads"

    self.head_dim = embedding_dim // num_heads

    self.fc_q = nn.Linear(embedding_dim, embedding_dim)
    self.fc_k = nn.Linear(embedding_dim, embedding_dim)
    self.fc_v = nn.Linear(embedding_dim, embedding_dim)

    self.dropout = nn.Dropout(dropout)

    self.fc_out = nn.Linear(embedding_dim, embedding_dim)

  def forward(self, query, key, value, mask=None):
    batch_size = query.shape[0]

    # Apply linear transformations for query, key, and value
    Q = self.fc_q(query)
    K = self.fc_k(key)
    V = self.fc_v(value)

    # Reshape the transformed Q, K, V for multi-head mechanism
    Q = Q.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
    K = K.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
    V = V.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)

    # Compute attention scores
    attention_scores = torch.matmul(Q, K.permute(0, 1, 3, 2)) / torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float32))

    if mask is not None:
      attention_scores = attention_scores.masked_fill(mask == 0, float("-1e20"))

    attention_weights = torch.softmax(attention_scores, dim=-1)
    attention_weights = self.dropout(attention_weights)

    output = torch.matmul(attention_weights, V)

    output = output.permute(0, 2, 1, 3).contiguous().view(batch_size, -1, self.embedding_dim)

    output = self.fc_out(output)

    return output


class TransformerEncoderBlock(nn.Module):
  def __init__(self, embedding_dim, num_heads, dropout=0.1):
    super(TransformerEncoderBlock, self).__init__()

    self.multihead_attention = MultiHeadAttentionLayer(embedding_dim, num_heads, dropout=dropout)

    self.norm1 = nn.LayerNorm(embedding_dim)
    self.norm2 = nn.LayerNorm(embedding_dim)

    self.feedforward = nn.Sequential(
        nn.Linear(embedding_dim, 4 * embedding_dim),
        nn.ReLU(),
        nn.Linear(4 * embedding_dim, embedding_dim),
        nn.Dropout(dropout)
    )

  def forward(self, x):
    # Self-attention and layer normalization
    attention_output = self.multihead_attention(x, x, x)
    x = self.norm1(x + attention_output)

    #Feedforward and layer normalization
    feedforward_output = self.feedforward(x)
    x = self.norm2(x + feedforward_output)

    return x

In [None]:
def train_model(model, train_loader, criterion, optimizer, num_epochs=10, val_loader=None):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    train_losses = []
    val_losses = []
    f1_scores = []

    for epoch in range(num_epochs):
        model.train()
        running_train_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_train_loss += loss.item() * images.size(0)

        epoch_train_loss = running_train_loss / len(train_loader.dataset)
        train_losses.append(epoch_train_loss)

        print(f"Epoch [{epoch+1}/{num_epochs}] Train Loss: {epoch_train_loss:.4f}")

        if val_loader is not None:
            val_loss, f1 = evaluate_model(model, val_loader, criterion)
            val_losses.append(val_loss)
            f1_scores.append(f1)

            print(f"Validation Loss: {val_loss:.4f} F1 Score: {f1:.4f}")

    print("Training finished!")

    if val_loader is not None:
        plot_metrics(train_losses, val_losses, f1_scores)

In [None]:
def evaluate_model(model, val_loader, criterion):
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  model.eval()

  running_val_loss = 0.0
  all_predictions = []
  all_labels = []

  with torch.no_grad():
    for images, labels in val_loader:
      images, labels = images.to(device), labels.to(device)

      outputs = model(images)
      loss = criterion(outputs, labels)
      running_val_loss += loss.item() * images.size(0)

      predictions = torch.argmax(outputs, dim=1)
      all_predictions.extend(predictions.cpu().numpy())
      all_labels.extend(labels.cpu().numpy())  # Append true labels to all_labels list

  val_loss = running_val_loss / len(val_loader.dataset)
  f1 = calculate_f1_score(all_labels, all_predictions)

  return val_loss, f1

In [None]:
def calculate_f1_score(y_true, y_pred):

  return f1_score(y_true, y_pred, average='macro')


def plot_metrics(train_losses, val_losses, f1_scores):
  plt.figure(figsize=(10, 4))

  plt.subplot(1, 2, 1)
  plt.plot(train_losses, label='Train Loss')
  plt.plot(val_losses, label='Val Loss')
  plt.xlabel('Epoch')
  plt.ylabel('Loss')
  plt.legend()

  plt.subplot(1, 2, 2)
  plt.plot(f1_scores, label='F1 Score', color='green')
  plt.xlabel('Epoch')
  plt.ylabel('F1 Score')
  plt.legend()

  plt.tight_layout()
  plt.show()

In [None]:
def main():

    image_size = 256  # Your image size
    patch_size = 16  # Patch size
    num_classes = 10
    model = VisionTransformer(image_size=image_size, patch_size=patch_size, num_classes=num_classes)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    train_model(model, train_loader, criterion, optimizer, num_epochs=10, val_loader=val_loader)


In [None]:
if __name__ == "__main__":
    main()