In [31]:
import torch.nn as nn
import torch.nn.functional as F
import torch
from nltk.tokenize import word_tokenize
import json
# from datasets import Dataset
from torch.utils.data import DataLoader
from torch.nn import CrossEntropyLoss
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset





In [71]:
batch_size = 256
block_size = 200 
learning_rate = 1e-3  
n_embd = 64  
# n_embd = 206808
n_head = 2  
n_layer = 4  
dropout = 0.5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
n_input = 64  
n_hidden = 100
n_output = 1
n_epoch = 20

In [4]:
class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias = False)
        self.query = nn.Linear(n_embd, head_size, bias = False)
        self.value = nn.Linear(n_embd, head_size, bias = False)
        self.dropout = nn.Dropout(dropout)


    
    def forward(self, x):
        B, T, C = x.shape
        
        k = self.key(x)
        q = self.query(x)
        
        wei = q @ k.transpose(-2,-1) * C**-0.5 
        wei_soft = F.softmax(wei, dim=-1) 
        wei = self.dropout(wei_soft)
        v = self.value(x) 
        output = wei @ v
        return output, wei_soft

In [5]:
class MultipleHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        head_outputs, attention_maps = [], []
        for head in self.heads:
            out, attn_map = head(x)
            head_outputs.append(out)
            attention_maps.append(attn_map[0])
        multi_head_output = torch.cat(head_outputs, dim=-1)
        output = self.dropout(self.proj(multi_head_output))
        return output, attention_maps

In [6]:
class FeedForward(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )
    def forward(self, x):
        return self.net(x)

In [7]:
class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultipleHeadAttention(n_head, head_size)
        self.ffwd = FeedForward()
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)
    def forward(self, x):
        x_out, attentionMaps = self.sa(self.ln1(x))
        x = x + x_out
        x = x + self.ffwd(self.ln2(x))
        return x, attentionMaps

In [8]:
class EncoderModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd)



        
    def forward(self, idx):
        B, T = idx.shape        # T: sequence length
        tok_emb = self.token_embedding_table(idx)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device).unsqueeze(0).expand(B,T)) # (T,C)
        
        x = tok_emb + pos_emb
        # x = self.blocks(x)
        # x = self.ln_f(x)
        attention_maps = []

        
        for block in self.blocks:
            x, attention_map = block(x)
            attention_maps.append(attention_map)
        
        x = self.ln_f(x)
        
        # x = torch.mean(x, dim=1) 
        x = x.mean(dim=1)
        
        return x, attention_maps

In [9]:
class FeedForwardClassifier(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.encoder = EncoderModel(vocab_size)
        self.classifier = nn.Sequential(
            nn.Linear(n_embd, n_hidden),
            nn.ReLU(),
            nn.Linear(n_hidden, n_output),
        )
    def forward(self, idx):
        x, attention_maps = self.encoder(idx)
        logits = self.classifier(x)
        return logits, attention_maps
        
        
        

In [10]:
class Tokenier:
    def __init__(self, text):
        self.vocab = set()
        self.sti = {}
        self.its = {}
        self.build(text)
    
    def build(self, text):
        tokens = word_tokenize(text)
        self.vocab = set(tokens)
        self.vocab_size = len(self.vocab) + 2
        self.sti = {word: i for i, word in enumerate(self.vocab, start=2)}
        self.sti['<pad>'] = 0
        self.sti['<unk>'] = 1
        self.itos = {i: word for word, i in self.sti.items()}
    
    def encode(self, text):
        tokens = word_tokenize(text)
        return [self.sti.get(word, self.sti['<unk>']) for word in tokens]
    

    

In [11]:
review_train = []
with open("./dataset/review_train.json", "r") as file:
    for line in file:
        review_train.append(json.loads(line))

review_test = []
with open("./dataset/review_test.json", "r") as file:
    for line in file:
        review_test.append(json.loads(line))

rating_train = [review['stars'] for review in review_train]
rating_test = [review['stars'] for review in review_test]

In [12]:
all_text = " ".join([review['text'] for review in review_train])
tokenizer = Tokenier(all_text)

In [51]:
class myDataset(Dataset):
    def __init__(self, tokenizer, reviews, labels):
        self.tokenizer = tokenizer
        self.samples = []
        for i in range(len(reviews)):
            self.samples.append((int(labels[i]), reviews[i]['text']))
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, index):
        label, text = self.samples[index]
        input_ids = torch.tensor(self.tokenizer.encode(text), dtype=torch.long)
        label_tensor = torch.tensor(label, dtype=torch.float)
        
        return input_ids, label_tensor

In [72]:
dataset_train = myDataset(tokenizer, review_train, rating_train)
dataset_test = myDataset(tokenizer, review_test, rating_test)

In [73]:
def collate_batch(batch):
    data, labels = zip(*batch) 
    
    pad_seqs = pad_sequence(data, batch_first=True, padding_value=0)
    pad_seqs = pad_seqs[:, :block_size]
    
    pad_seqs = F.pad(pad_seqs, (0, max(0, block_size - pad_seqs.shape[1])), "constant", 0)
    
    labels = torch.stack(labels)  
    
    return pad_seqs, labels
    

    
    
   

In [74]:
train_loader = DataLoader(dataset_train, batch_size=batch_size, collate_fn=collate_batch, shuffle=True)
test_loader = DataLoader(dataset_test, batch_size=batch_size, collate_fn=collate_batch, shuffle=False)

In [75]:
model = FeedForwardClassifier(tokenizer.vocab_size).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
criterion = nn.MSELoss()

In [76]:
def compute_classifier_accuracy(classifier, data_loader):
    classifier.eval()
    total_correct = 0
    total_samples = 0
    with torch.no_grad():
        for X, Y in data_loader:
            X, Y = X.to(device), Y.to(device)
            outputs, atten_maps = classifier(X)
            _, predicted = torch.max(outputs.data, 1)
            total_correct += (predicted == Y).sum().item()
            total_samples += Y.size(0)
        accuracy = (100 * total_correct / total_samples)
        classifier.train()
        return accuracy

In [None]:
accuracies_train = []
accuracies_test = []

for epoch in range(n_epoch):
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)    
        output, _ = model(xb)
        # print(output.shape)
        # print(yb.shape)
        loss = criterion(output, yb)
                
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # print(f"Epoch {epoch+1}/{n_epoch}, Loss: {loss.item()}")
        
    accuracies_train.append(compute_classifier_accuracy(model, train_loader))
    accuracies_test.append(compute_classifier_accuracy(model, train_loader))
    print(f"Epoch {epoch+1}/{n_epoch}, Loss: {loss.item()}")
    print(f"Epoch {epoch+1}, train_accuracy: {compute_classifier_accuracy(model, train_loader)}")
    print(f"Epoch {epoch+1}, test_accuracy: {compute_classifier_accuracy(model, test_loader)}")


  return F.mse_loss(input, target, reduction=self.reduction)
