In [46]:
import torch
import torch.nn as nn
import torchtext
from torchtext.data.functional import generate_sp_model, load_sp_model, sentencepiece_tokenizer, sentencepiece_numericalizer
from torchtext.vocab import build_vocab_from_iterator
import torchtext.transforms as T
from torch.nn import functional as F
import torch.optim as optim

import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import io
import math
from torch.utils.data import Dataset, DataLoader
import torch.utils.data.dataloader as dataloader
import os
import re

In [47]:
# Load the tokenizer and vocabulary
model_path = "../models/gpt.pth"
tokenizer_path = "../SentencePiece/transformer.model"
vocab_path = "../SentencePiece/transformer.vocab"

In [48]:
def yield_tokens(file_path):
    with io.open(file_path, encoding='utf-8') as f:
        # Iterate through each line in the file
        for line in f:
            # Accesses the vocab file, splits the line by tab, and gets the first entry (the actual token)
            # Yield the token from the first column (split by tab)
            yield [line.split("\t")[0]]

# Build a vocabulary from the tokens yielded by the yield_tokens function
    # <pad> is a padding token that is added to the end of a sentence to ensure the length of all sequences in a batch is the same
    # <sos> signals the "Start-Of-Sentence" aka the start of the sequence
    # <eos> signal the "End-Of-Sentence" aka the end of the sequence
    # <unk> "unknown" token is used if a token is not contained in the vocab
# From torchtext library (build_vocab_from_iterator)
# Builds a generator object, that is treated like an iterator
vocab = build_vocab_from_iterator(yield_tokens("../SentencePiece/transformer.vocab"), specials=['<cls>', '<pad>', '<eos>', '<unk>'], special_first=True)

# Set the default index for unknown tokens to the index of the '<unk>' token
vocab.set_default_index(vocab['<unk>'])

In [49]:
text_transform = T.Sequential(
    T.SentencePieceTokenizer(tokenizer_path),
    T.VocabTransform(vocab),
    T.AddToken(vocab['<cls>'], begin=True),
    T.Truncate(max_seq_len=254),
    T.AddToken(vocab['<eos>'], begin=False),
    T.ToTensor(padding_value=vocab['<pad>']),
    T.PadTransform(max_length=256, pad_value=0),
)


In [50]:
# Define the model architecture (Transformer) and hyperparameters if needed
class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )
    
    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    # Transformer followed by computation
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ffwd = FeedForward(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x)) # Residual connection
        x = x + self.ffwd(self.ln2(x))
        return x


class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
#         self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        wei = q @ k.transpose(-2, -1) * C ** -0.5
#         wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        v = self.value(x)
        out = wei @ v
        return out


"""
Defines multi-head attention, given num_heads
"""
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim = -1)
        out = self.dropout(self.proj(out))
        return out



class Transformer(nn.Module):
    
    def __init__(self):
        
        super().__init__()
        
        # Each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size + 1, n_embd)
                                                    
        # self.sa_heads = MultiHeadAttention(4, n_embd//4) # Four heads, 8-dim self-attention
        # self.ffwd = FeedForward(n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head = n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, output_size)
 
    def forward(self, idx, targets=None):
        B, T = idx.shape
        tok_emb = self.token_embedding_table(idx) # (B, T, C)
        pos_emb = self.position_embedding_table(torch.arange(T, device = device)) # (T, C)
        x = tok_emb + pos_emb # (B,T,C) -> broadcasted
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x) # (B, T, output_size)

        return logits

In [51]:
# Hyperparameters
device = 'cuda' if torch.cuda.is_available() else 'cpu'
block_size = 256
n_embd = 768
n_head = 12
n_layer = 2
dropout = 0.3
nepochs = 100
output_size = 2
vocab_size = len(vocab)
learning_rate = 5e-4

In [52]:
model = Transformer()
model.load_state_dict(torch.load(model_path))
model = model.to(device)
model.eval()
print("Model loaded successfully.")

Model loaded successfully.


In [61]:
def predict_single_text(model, text):
    tokens = text_transform([text]).to(device)
    # tokens = tokens.unsqueeze(0)  # Add batch dimension
    print(tokens.shape)
    with torch.no_grad():
        logits = model(tokens)
    logits = logits[:, 0, :]
    probs = F.softmax(logits, dim=-1)
    label = torch.argmax(probs, dim  = 1)
    print(probs)
    if label.item() == 0:
        return 'negative'
    else:
        return 'positive'

# Example usage:
text = "Meryl Streep as Kate, a woman dying of cancer, performs her role admirably. No wonder she was up for an Oscar. In the part she proves that caring and nurturing housewives are just as important as their sisters out in the business world. And the lesson she teaches about life's expectations and their lack of fulfillment as the relationship grows, that is the most important thing she teaches her daughter. We can expect too much of our mates. Realize that there are many slips and forgiveness or understanding are the main ingredients of a happy life. This is a sombre movie and the ending though sad, shows reconciliation between the father and daughter. I give this one a ten."


output = predict_single_text(model, text)


print("Predicted probabilities:", output)

torch.Size([1, 256])
tensor([[0.9797, 0.0203]], device='cuda:0')
Predicted probabilities: negative
