In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision import datasets
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split, Dataset
import matplotlib.pyplot as plt
import numpy as np
import torchvision
from PIL import Image

In [None]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()

--2023-10-17 18:04:53--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2023-10-17 18:04:53 (48.8 MB/s) - ‘input.txt’ saved [1115394/1115394]



In [None]:
# here are all the unique characters that occur in this text
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from characters to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

In [None]:
# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

# Hyperparameter


In [None]:
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
#vocab_size = 50304
max_iters = 3000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 384
n_head = 6
n_layer = 6
dropout = 0.2

# Multi Heads Attention

In [None]:
class SelfAttention(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size)
        self.query = nn.Linear(n_embd, head_size)
        self.value = nn.Linear(n_embd, head_size)

        #Buffers are persistent tensors that are part of the module's parameters but are not updated during backpropagation.
        #torch.tril create a lower triangular matrix for masking purposes
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B,T,C = x.shape
        k = self.key(x)
        q = self.query(x)

        #calculate attention score
        # k.shape[-1]**-0.5 --> scale the dot product by the result of square root of k
        attn = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5              # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        mask = self.tril[:T, :T] == 0
        #upper triangle is 0 so all the elements there will be true, and is replaced with -inf
        attn = attn.masked_fill(mask, float('-inf')) # (B, T, T)
        attn = F.softmax(attn, dim=-1)
        attn = self.dropout(attn)

        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = attn @ v #  (B, T, hs)
        return out


In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.headlist = nn.ModuleList([SelfAttention(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        multihead = [h(x) for h in self.headlist]
        out = torch.cat(multihead, dim=-1)

        #reduce dimension to n_embd
        out = self.proj(out)
        out = self.dropout(out)
        return out


# Decoder Block

In [None]:
class MLP(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.c_fc    = nn.Linear(n_embd, 4 * n_embd)  #OpenAI use Conv1d, will that be better for image recognition purpose?
        self.gelu    = nn.GELU() # OpenAI use GELU
        self.c_proj  = nn.Linear(4 * n_embd, n_embd) #Residual Connection
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.c_fc(x)
        x = self.gelu(x)
        x = self.c_proj(x)
        x = self.dropout(x)
        return x


In [None]:
class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.attn = MultiHeadAttention(n_head, head_size)
        self.mlp = MLP(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.attn(self.ln1(x))  #LayerNorm is applied b4 attention or feed forward layer, which is different from the 2017 paper
        x = x + self.mlp(self.ln2(x))   # add x for residual connection
        return x

# GPT2


In [None]:
class GPT2(nn.Module):
   def __init__(self):
        super().__init__()
        self.wte = nn.Embedding(vocab_size, n_embd) # token embedding
        self.wpe = nn.Embedding(block_size, n_embd) # positional encoding

        self.blocklist = [Block(n_embd, n_head=n_head) for _ in range(n_layer)]
        self.blocks = nn.Sequential(*self.blocklist) # '*' operator unpack iterables like lists/tuples

        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size) # lm_head takes the output from the transformer's layers and projects it to the vocabulary size -> predict the next token

        # better init, not covered in the original GPT video
        self.apply(self._init_weights)

   def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

   def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.wte(idx) # (B,T,C)
        pos_emb = self.wpe(torch.arange(T, device=device)) # torch.arange generates a 1-dimensional tensor with values ranging from 0 to T-1.
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

   def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

In [None]:
model = GPT2()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))

10.795841 M parameters
step 0: train loss 4.2241, val loss 4.2273
step 500: train loss 2.0079, val loss 2.0978
step 1000: train loss 1.5078, val loss 1.7021
step 1500: train loss 1.3400, val loss 1.5693
step 2000: train loss 1.2479, val loss 1.5146
step 2500: train loss 1.1757, val loss 1.4864
step 2999: train loss 1.1138, val loss 1.4748

Yoru supreth him; a bridle king'd with his staff,
And you on of Sobtle's much her.

TrON:
The low now is great hrim it Juliht.

CATESBY:
And I am I many murdering.

BENVOLIO:
'Tis amos on a crown, with such a man's cheeks
thoughts them not what's biddle my soldier about,
Ladiests of themrotst with being readies: if and so
He excellent torm me, or trouble page against him
White'er the breat of all revenge as when them who the gate;
The Frence we to misgow his is at this Viel here.

First Soldier:


# Training loop

In [None]:
class GPT2ImageClassifier(nn.Module):
    def __init__(self, num_channels=3, num_classes=24):
        super(GPT2ImageClassifier, self).__init__()

        # Convolutional layers
        if num_channels == 3:
            self.conv1 = nn.Conv2d(3, 16, 3, stride=1, padding=1) # input channel to 3 for RGB
        else:
            self.conv1 = nn.Conv2d(1, 16, 3, stride=1, padding=1) # input channel to 3 for RGB

        self.conv2 = nn.Conv2d(16, 32, 3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(2, 2)

        # GPT-2 configuration and model
        #self.config = GPT2Config()
        self.gpt2 = GPT2()

        # Adjust the output size of the CNN to match GPT-2's expected input size
        self.fc1 = nn.Linear(64 * 16 * 16, n_embd)

        # Classifier
        self.fc2 = nn.Linear(n_embd, num_classes)

    def forward(self, x):
        # Convolutional layers
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))

        # Flatten the output
        x = x.view(x.size(0), -1)
        x = self.fc1(x)

        # Reshape for GPT-2
        #x = x.unsqueeze(1)  # Add sequence length dimension

        # GPT-2 model
        outputs = self.gpt2(inputs_embeds=x)
        x = outputs.last_hidden_state[:, 0, :]

        # Classifier
        x = self.fc2(x)

        return x

In [None]:
def trainandsave(dataset, train_loader, validation_loader, num_channels, num_classes, name):
    # Instantiate the model
    model = GPT2ImageClassifier(num_channels=num_channels, num_classes=num_classes)
    model.to(device)
    model = torch.nn.DataParallel(model)
    cudnn.benchmark = True

    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train the model
    num_epochs = 5

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        correct_train = 0
        total_train = 0

        # Training loop
        for images, labels in train_loader:
            images = images.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            _, predicted_train = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted_train == labels).sum().item()

        train_accuracy = 100 * correct_train / total_train

        # Validation loop
        model.eval()
        correct_test = 0
        total_test = 0
        with torch.no_grad():
            for images, labels in validation_loader:
                images = images.to(device)
                labels = labels.to(device)
                outputs = model(images)
                _, predicted_test = torch.max(outputs.data, 1)
                total_test += labels.size(0)
                correct_test += (predicted_test == labels).sum().item()

        test_accuracy = 100 * correct_test / total_test

        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {total_loss/len(train_loader)}, \
        Train Accuracy: {train_accuracy}%, Val Accuracy: {test_accuracy}%")

#     # Get some random test images
#     dataiter = iter(test_loader)
#     images, labels = next(dataiter)

#     # Get the predicted labels
#     model.eval()  # Make sure the model is in evaluation mode
#     with torch.no_grad():
#         images = images.to(device)
#         outputs = model(images)
#         _, predicted = torch.max(outputs.data, 1)
#     # Convert tensor indices to actual class names
#     predicted_labels = [dataset.classes[i] for i in predicted]

#     # Show images with the predicted labels as the title
#     imshow(torchvision.utils.make_grid(images), predicted_labels,'predictedimg.png')
    print(f"Save {name}")
    torch.save(model.state_dict(), name)

In [None]:
def datasetload(path='./imagesample'):
    # Define image transformations
    transform = transforms.Compose([
        transforms.Resize((128, 128)),  # Resize images to match the input size of the model
        transforms.ToTensor(),
    ])

    # Load the dataset
    dataset = ImageFolder(root=path, transform=transform)

    # Get the number of classes
    num_classes = len(dataset.classes)

    # Split the dataset into training and testing sets
    train_size = int(0.8 * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)
    return dataset, train_dataset, test_dataset, train_loader, test_loader, num_classes

In [None]:
def trainfunction(path):
    dataset1,train_dataset, test_dataset,train_loader,test_loader,num_classes = datasetload(path)
    trainandsave(dataset1,train_loader,test_loader,num_classes)

trainfunction()