<a href="https://colab.research.google.com/github/tony3ynot/GPT-1/blob/main/GPT_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
from einops import rearrange

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#1. Model Architecture




## 1-1. Transformer Decoder

In [2]:
### Multi-Head Attention
class MHA(nn.Module):
    def __init__(self, d_model, n_heads):
        super().__init__()
        self.n_heads = n_heads

        self.fc_q = nn.Linear(d_model, d_model) # Query
        self.fc_k = nn.Linear(d_model, d_model) # Key
        self.fc_v = nn.Linear(d_model, d_model) # Value

        self.fc = nn.Linear(d_model, d_model) # Linear Layer

        self.scale = torch.sqrt(torch.tensor(d_model/n_heads))

    def forward(self, Q, K, V, mask = None):
        Q = self.fc_q(Q)
        K = self.fc_k(K)
        V = self.fc_v(V)

        ## B = batch size / L = length / H = heads / D = dimension
        # rearrange to implement 'heads'
        Q = rearrange(Q, 'B L (H D) -> B H L D', H = self.n_heads)
        K = rearrange(K, 'B L (H D) -> B H L D', H = self.n_heads)
        V = rearrange(V, 'B L (H D) -> B H L D', H = self.n_heads)

        ## Self-Attention
        # 1. MatMul
        attention_score = Q @ K.transpose(-2, -1)

        # 2. Scale
        attention_score = attention_score / self.scale

        # 3. Masking
        if mask is not None:
            mask = mask.unsqueeze(1).repeat(1, self.n_heads, 1, 1)
            attention_score.masked_fill_(mask, -1e9)

        # 4. SoftMax
        attention_weights = torch.softmax(attention_score, dim=-1)

        # 5. MatMul
        attention = attention_weights @ V

        ## Concat & Linear
        # rearrange to concat
        x = rearrange(attention, 'B H L D -> B L (H D)')
        output = self.fc(x)

        return output


### Feed Forward Network
class FFN(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()

        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.gelu = nn.GELU()

        nn.init.xavier_normal_(self.linear1.weight)
        nn.init.xavier_normal_(self.linear2.weight)

    def forward(self, x):
        x = self.gelu(self.linear1(x))
        output = self.linear2(x)

        return output


### Decoder Layer
class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, resid_drop):
        super().__init__()

        self.mha = MHA(d_model, n_heads)
        self.dropout1 = nn.Dropout(resid_drop)
        self.layernorm1 = nn.LayerNorm(d_model, eps=1e-5)

        self.ffn = FFN(d_model, d_ff)
        self.dropout2 = nn.Dropout(resid_drop)
        self.layernorm2 = nn.LayerNorm(d_model, eps=1e-5)

    def forward(self, x, attn_mask):
        # Masked-MHA layer (with residual shortcut connection)
        residual = self.mha(x, x, x, attn_mask)
        residual = self.dropout1(residual)
        x = self.layernorm1(x + residual)

        # FFN layer (with residual shortcut connection)
        residual = self.ffn(x)
        residual = self.dropout2(residual)
        output = self.layernorm2(x + residual)

        return output


### Decoder
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, seq_len, d_model, n_layers, n_heads, d_ff, embd_drop, resid_drop, pad_id):
        super().__init__()

        self.pad_id = pad_id

        ## Decoder Input
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.dropout = nn.Dropout(embd_drop)
        self.pos_embedding = nn.Embedding(seq_len+1, d_model) # learned positional embedding

        ## Decoder Layers
        self.layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, resid_drop) for _ in range(n_layers)])

        nn.init.xavier_normal_(self.embedding.weight)

    def forward(self, x):
        ## padding mask for position embedding
        positions = torch.arange(x.size(1), device=x.device).repeat(x.size(0), 1) + 1
        position_pad_mask = x.eq(self.pad_id)
        positions.masked_fill_(position_pad_mask, 0)

        output = self.dropout(self.embedding(x)) + self.pos_embedding(positions)

        ## attention mask
        pad_mask = self.get_padding_mask(x, x, self.pad_id)
        future_mask = self.get_future_mask(x).to(device=pad_mask.device)
        attn_mask = torch.gt((pad_mask.to(dtype=future_mask.dtype) + future_mask), 0)

        for layer in self.layers:
            output = layer(output, attn_mask)

        return output

    ## padding mask : apply masking to padding tokens
    def get_padding_mask(self, q, k, pad_id):
        pad_mask = k.eq(pad_id).unsqueeze(1).repeat(1, q.size(1), 1)

        return pad_mask

    ## future token mask : apply masking to future tokens
    def get_future_mask(self, q):
        bs, q_len = q.size()
        future_mask = torch.ones(bs, q_len, q_len).triu(diagonal=1)

        return future_mask

## 1-2. GPT-1

In [3]:
### GPT-1
class GPT(nn.Module):
    def __init__(self,
                 vocab_size,
                 seq_len = 512,
                 d_model = 768,
                 n_layers = 12,
                 n_heads = 12,
                 d_ff = 3072,
                 embd_drop = 0.1,
                 resid_drop = 0.1,
                 pad_id = 0):
        super().__init__()

        self.decoder = TransformerDecoder(vocab_size, seq_len, d_model, n_layers, n_heads,
                                          d_ff, embd_drop, resid_drop, pad_id)

    def forward(self, x):
        outputs = self.decoder(x)

        return outputs


### Language Model (pre-training)
class GPTLMHead(nn.Module):
    def __init__(self, gpt):
        super().__init__()
        vocab_size, d_model = gpt.decoder.embedding.weight.size()

        self.gpt = gpt
        self.linear = nn.Linear(d_model, vocab_size, bias = False)
        self.linear.weight = gpt.decoder.embedding.weight

    def forward(self, x):
        x = self.gpt(x)

        lm_logits = self.linear(x)

        return lm_logits


### Classification Model (fine-tuning)
class GPTClsHead(nn.Module):
    def __init__(self, gpt, n_class, cls_token_id, cls_drop=0.1):
        super().__init__()
        vocab_size, d_model = gpt.decoder.embedding.weight.size()
        self.cls_token_id = cls_token_id

        self.gpt = gpt

        # LM
        self.linear1 = nn.Linear(d_model, vocab_size, bias=False)
        self.linear1.weight = gpt.decoder.embedding.weight
        # Cls
        self.linear2 = nn.Linear(d_model, n_class)
        self.dropout = nn.Dropout(cls_drop)

        nn.init.normal_(self.linear2.weight, std=0.02)
        nn.init.normal_(self.linear2.bias, 0)

    def forward(self, x):
        outputs = self.gpt(x)

        lm_logits = self.linear1(outputs)

        outputs = outputs[x.eq(self.cls_token_id)]
        cls_logits = self.linear2(self.dropout(outputs))

        return lm_logits, cls_logits

# 2. Training

## 2-1. Pre-training

In [4]:
!pip install transformers datasets tokenizers

import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace
import numpy as np
from tqdm import tqdm

device = "cuda" if torch.cuda.is_available() else "cpu"

Collecting datasets
  Downloading datasets-3.2.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.2.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m12.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl 

In [5]:
### WikiText Dataset class
class WikiTextDataset(Dataset):
    def __init__(self, data, tokenizer, seq_len):
        self.data = data
        self.tokenizer = tokenizer
        self.seq_len = seq_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data[idx]['text']
        encoded = self.tokenizer.encode(text)
        input_ids = encoded.ids

        # sequence length matching
        if len(input_ids) > self.seq_len:
            input_ids = input_ids[:self.seq_len]
        else:
            input_ids = input_ids + [0] * (self.seq_len - len(input_ids))

        # input & target (for next-word prediction)
        inputs = torch.tensor(input_ids[:-1])
        targets = torch.tensor(input_ids[1:])

        return inputs, targets

In [6]:
### Hyper-parameters
VOCAB_SIZE = 10000
SEQ_LEN = 512
BATCH_SIZE = 8
EPOCHS = 3
LEARNING_RATE = 1e-4

### Tokenizer Training
dataset = load_dataset('wikitext', 'wikitext-2-raw-v1')
tokenizer = Tokenizer(BPE())
trainer = BpeTrainer(vocab_size=VOCAB_SIZE, special_tokens=["<pad>", "<cls>"])
tokenizer.pre_tokenizer = Whitespace()

def get_training_corpus():
    for i in range(0, len(dataset['train'])):
        yield dataset['train'][i]['text']

tokenizer.train_from_iterator(get_training_corpus(), trainer)

### Dataset Setup
train_dataset = WikiTextDataset(dataset['train'], tokenizer, SEQ_LEN + 1)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/10.5k [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/733k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/6.36M [00:00<?, ?B/s]

validation-00000-of-00001.parquet:   0%|          | 0.00/657k [00:00<?, ?B/s]

Generating test split:   0%|          | 0/4358 [00:00<?, ? examples/s]

Generating train split:   0%|          | 0/36718 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/3760 [00:00<?, ? examples/s]

In [7]:
### Model Initialization
model = GPTLMHead(GPT(vocab_size=VOCAB_SIZE, seq_len=SEQ_LEN)).to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=len(train_dataloader) * EPOCHS)

In [None]:
### Pre-Training
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0

    progress_bar = tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{EPOCHS}')
    for batch_idx, (inputs, targets) in enumerate(progress_bar):
        inputs, targets = inputs.to(device), targets.to(device)

        logits = model(inputs)
        loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=0)

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()
        scheduler.step()

        total_loss += loss.item()
        progress_bar.set_postfix({'loss': total_loss / (batch_idx + 1)})

    avg_loss = total_loss / len(train_dataloader)
    print(f"\nEpoch {epoch+1} Average Loss: {avg_loss:.4f}")

print("Training completed!")

torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'scheduler_state_dict': scheduler.state_dict(),
    'final_loss': avg_loss
}, 'gpt1_pretrained.pt')

Epoch 1/3: 100%|██████████| 4590/4590 [15:52<00:00,  4.82it/s, loss=nan]



Epoch 1 Average Loss: nan


Epoch 2/3: 100%|██████████| 4590/4590 [15:53<00:00,  4.81it/s, loss=nan]



Epoch 2 Average Loss: nan


Epoch 3/3: 100%|██████████| 4590/4590 [15:52<00:00,  4.82it/s, loss=7.09]



Epoch 3 Average Loss: 7.0898
Training completed!


## 2-2. Fine-tuning

In [8]:
### IMDB Dataset class
class IMDBDataset(Dataset):
    def __init__(self, data, tokenizer, seq_len):
        self.data = data
        self.tokenizer = tokenizer
        self.seq_len = seq_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data[idx]['text']
        label = self.data[idx]['label']

        # Add CLS token at the start
        encoded = self.tokenizer.encode("<cls> " + text)
        input_ids = encoded.ids

        # Truncate or pad sequence
        if len(input_ids) > self.seq_len:
            input_ids = input_ids[:self.seq_len]
        else:
            input_ids = input_ids + [0] * (self.seq_len - len(input_ids))

        return torch.tensor(input_ids), torch.tensor(label)

In [9]:
### Dataset Setup
from datasets import load_dataset
imdb_dataset = load_dataset('imdb')

train_dataset = IMDBDataset(imdb_dataset['train'], tokenizer, SEQ_LEN)
train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_dataset = IMDBDataset(imdb_dataset['test'], tokenizer, SEQ_LEN)
val_dataloader = DataLoader(val_dataset, batch_size=4)

README.md:   0%|          | 0.00/7.81k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/21.0M [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/20.5M [00:00<?, ?B/s]

unsupervised-00000-of-00001.parquet:   0%|          | 0.00/42.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised split:   0%|          | 0/50000 [00:00<?, ? examples/s]

In [11]:
premodel = GPTLMHead(GPT(vocab_size=VOCAB_SIZE, seq_len=SEQ_LEN)).to(device)
premodel.load_state_dict(torch.load('gpt1_pretrained.pt'))

### Fine-tuning Model Initialization
model = GPTClsHead(
    gpt=premodel.gpt,  # pretrained GPT
    n_class=2,
    cls_token_id=tokenizer.token_to_id("<cls>"),
    cls_drop=0.1
).to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)

In [None]:
### Fine-tuning
EPOCHS = 3
auxiliary_ratio = 0.5
best_acc = 0

for epoch in range(EPOCHS):
    ## Training
    model.train()
    total_loss = 0

    progress_bar = tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{EPOCHS}')
    for batch_idx, (inputs, labels) in enumerate(progress_bar):
        inputs, labels = inputs.to(device), labels.to(device)

        lm_logits, cls_logits = model(inputs)
        lm_logits = lm_logits[:, :-1].contiguous()

        ## Loss Function w/ Auxiliary Function
        lm_loss = F.cross_entropy(lm_logits.view(-1, lm_logits.size(-1)),
                                  inputs[:, 1:].contiguous().view(-1), ignore_index=0) # L1 (Auxiliary)
        cls_loss = F.cross_entropy(cls_logits, labels) # L2
        loss = cls_loss + (auxiliary_ratio * lm_loss) # L3

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()
        progress_bar.set_postfix({'loss': total_loss / (batch_idx + 1)})

    ## Validation
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in val_dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            _, cls_logits, _ = model(inputs)

            predictions = torch.argmax(cls_logits, dim=-1)
            correct += (predictions == labels).sum().item()
            total += labels.size(0)

    accuracy = correct / total
    print(f"Epoch {epoch+1} Validation Accuracy: {accuracy:.4f}")

    # Save best model
    if accuracy > best_acc:
        best_acc = accuracy
        torch.save(model.state_dict(), 'gpt1_imdb_best.pt')

print(f"Fine-tuning completed! Best accuracy: {best_acc:.4f}")

Epoch 1/3:  84%|████████▍ | 5243/6250 [10:09<01:56,  8.65it/s, loss=4.04]

# Test

In [None]:
model = GPTClsHead(
    GPT(vocab_size=VOCAB_SIZE, seq_len=SEQ_LEN),
    n_class=2,
    cls_token_id=tokenizer.token_to_id("<cls>"),
    cls_drop=0.1
)

model.load_state_dict(torch.load('gpt1_imdb_best.pt'))

In [None]:
# Test
def predict_sentiment(text):
    model.eval()
    encoded = tokenizer.encode("<cls> " + text)
    input_ids = encoded.ids

    if len(input_ids) > SEQ_LEN:
        input_ids = input_ids[:SEQ_LEN]
    else:
        input_ids = input_ids + [0] * (SEQ_LEN - len(input_ids))

    inputs = torch.tensor([input_ids]).to(device)

    with torch.no_grad():
        _, cls_logits, _ = model(inputs)
        prediction = torch.argmax(cls_logits, dim=-1)

    return "Positive" if prediction.item() == 1 else "Negative"

# Example
test_text = "This movie was really great! I enjoyed every moment of it."
print(f"Text: {test_text}")
print(f"Sentiment: {predict_sentiment(test_text)}")