In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader


In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device = torch.device("cpu")
device

device(type='cuda')

In [3]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(start=0, end=max_len, step=1).float().unsqueeze(1)
        embedding_index = torch.arange(start=0, end=d_model, step=2).float()
        div_term = 1 / torch.tensor(10000.0)**(embedding_index / d_model)
        #print(div_term)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe',pe)

    def forward(self, word_embeddings):
        pe_temp = self.pe[:word_embeddings.size(0), :]
        pe_temp_expanded = pe_temp.unsqueeze(1)
        #print(f"word_embeddings.shape: {word_embeddings.shape}, self.pe.shape: {pe_temp_expanded.shape}, ")
        return word_embeddings + pe_temp_expanded

In [4]:
class Attention(nn.Module):
    def __init__(self, d_model=2):
        super().__init__()
        self.W_q = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_k = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_v = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.row_dim = 0
        self.col_dim = 1

    def forward(self, encodings, mask=None):
        q = self.W_q(encodings)
        k = self.W_k(encodings)
        v = self.W_v(encodings)

        # Ensure k has the same shape as q before transpose
        assert k.shape == q.shape

        # Transpose k to align with q for dot product
        k_transposed = k.transpose(-1, -2)

        # Check shapes
        #print("Shape of q:", q.shape)  # [1, 5, 2]
        #print("Shape of k_transposed:", k_transposed.shape)  # [1, 2, 5]
        sims = torch.matmul(q, k_transposed)
        scaled_sims = sims / torch.tensor(k.size(1)**0.5)

        if mask is not None:
            mask = mask.to(device)
            scaled_sims = scaled_sims.masked_fill(mask=mask, value=-1e9)

        attention_percents = F.softmax(scaled_sims)
        attention_scores = torch.matmul(attention_percents, v)
        return attention_scores

In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model=2,heads=2):
        super().__init__()

        self.W_qs = []
        self.W_ks = []
        self.W_vs = []

        for index in range(heads):
            W_q = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
            W_k = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
            W_v = nn.Linear(in_features=d_model, out_features=d_model, bias=False)

            self.W_qs.append(W_q)
            self.W_ks.append(W_k)
            self.W_vs.append(W_v)

        self.unify_heads = nn.Linear(d_model * heads, d_model)

        self.row_dim = 0
        self.col_dim = 1
        self.heads = heads

    def forward(self, encodings, mask=None):
        attentionscores = []
        #encodings.to(device)
        for index in range(self.heads):
            W_q = self.W_qs[index].to(device)
            W_k = self.W_ks[index].to(device)
            W_v = self.W_vs[index].to(device)

            q = W_q(encodings.to(device))
            k = W_k(encodings.to(device))
            v = W_v(encodings.to(device))

            k_transposed = k.transpose(-1, -2)
            sims = torch.matmul(q, k_transposed)
            scaled_sims = sims / torch.tensor(k.size(1)**0.5)

            if mask is not None:
                mask = mask.to(device)
                scaled_sims = scaled_sims.masked_fill(mask=mask, value=-1e9)

            attention_percents = F.softmax(scaled_sims)
            attention_scores = torch.matmul(attention_percents, v)
            attentionscores.append(attention_scores)

        combined_attention_scores = torch.cat(attentionscores, dim=-1)
        combined_output = self.unify_heads(combined_attention_scores)

        return combined_output

In [6]:
class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, num_tokens, using_mask=True):
        super(DecoderBlock, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, heads=num_heads)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.relu = nn.ReLU()
        self.fc_layer = nn.Linear(in_features=d_model, out_features=d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.fc_layer2 = nn.Linear(in_features=d_model, out_features=d_model)
        dropout=0.1
        self.dropout = nn.Dropout(dropout)
        self.using_mask = using_mask

    def forward(self, position_encoded, mask=None):
        if self.using_mask:
            self_attention_values = self.self_attention(position_encoded, mask=mask)
        else:
            self_attention_values = self.self_attention(position_encoded)

        residual_connection_values = position_encoded + self_attention_values
        normalized_values1 = self.layer_norm1(residual_connection_values)

        fc_layer_output_relu = self.relu(self.fc_layer(normalized_values1))
        #fc_layer_output_dropout = self.dropout(fc_layer_output_relu)
        #fc_layer_output = self.fc_layer2(fc_layer_output_dropout)
        #final_output = self.layer_norm2(normalized_values1 + fc_layer_output)
        #fc_layer_output = self.fc_layer2(self.dropout(self.relu(self.fc_layer(normalized_values1))))
        #return final_output
        #fc_layer_output = self.fc_layer(normalized_values1)
        fc_layer_output = self.relu(self.fc_layer(normalized_values1))
        return fc_layer_output

In [7]:
class DecoderOnlyTransformerBlockTransformer(nn.Module):
    def __init__(self, num_tokens, d_model, max_len, using_mask=True):
        super(DecoderOnlyTransformerBlockTransformer, self).__init__()
        self.number_heads = 8
        self.we = nn.Embedding(num_embeddings=num_tokens, embedding_dim=d_model)
        self.pe = PositionalEncoding(d_model=d_model, max_len=max_len)
        self.decoder_block1 = DecoderBlock(d_model=d_model, num_heads=self.number_heads, num_tokens=num_tokens, using_mask=using_mask)
        #self.decoder_block2 = DecoderBlock(d_model=d_model, num_heads=self.number_heads, num_tokens=num_tokens, using_mask=using_mask)
        #self.decoder_block3 = DecoderBlock(d_model=d_model, num_heads=self.number_heads, num_tokens=num_tokens, using_mask=using_mask)
        #self.decoder_block4 = DecoderBlock(d_model=d_model, num_heads=self.number_heads, num_tokens=num_tokens, using_mask=using_mask)
        #self.decoder_block5 = DecoderBlock(d_model=d_model, num_heads=self.number_heads, num_tokens=num_tokens, using_mask=using_mask)
        #self.decoder_block6 = DecoderBlock(d_model=d_model, num_heads=self.number_heads, num_tokens=num_tokens, using_mask=using_mask)

        self.fc_layer = nn.Linear(in_features=d_model, out_features=1)
        self.loss = nn.CrossEntropyLoss()

    def forward(self, token_ids):
        word_embeddings = self.we(token_ids)
        position_encoded = self.pe(word_embeddings)

        if self.decoder_block1.using_mask:
            mask_ones = torch.ones((token_ids.size(dim=1), token_ids.size(dim=1)))
            mask = torch.tril(mask_ones)
            mask = mask == 0
        else:
            mask = None

        output_block1 = self.decoder_block1(position_encoded, mask=mask)
        #output_block2 = self.decoder_block2(output_block1, mask=mask)
        #output_block3 = self.decoder_block3(output_block2, mask=mask)
        #output_block4 = self.decoder_block4(output_block3, mask=mask)
        #output_block5 = self.decoder_block5(output_block4, mask=mask)
        #output_block6 = self.decoder_block6(output_block5, mask=mask)

        fc_layer_output = self.fc_layer(output_block1)
        #print(f"fc_layer_output.shape: {fc_layer_output.shape}")

        return fc_layer_output

In [8]:
import pandas as pd

# Load the CSV file
df = pd.read_csv('./datasets/IMDBDataset.csv')
df = df.head(4096)
df.shape[0]

4096

In [9]:
from collections import Counter

def build_vocab(texts, min_freq=1):
    counter = Counter()
    for text in texts:
        words = text.lower().split()
        counter.update(words)
    # Only keep words with a frequency greater than min_freq
    vocab = {word: idx + 2 for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab['<PAD>'] = 0
    vocab['<UNK>'] = 1
    return vocab

# Build vocabulary from reviews
vocab = build_vocab(df['review'].tolist())

In [10]:
def tokenize(text, vocab):
    return [vocab.get(word, vocab['<UNK>']) for word in text.lower().split()]

df['tokenized_review'] = df['review'].apply(lambda x: tokenize(x, vocab))

In [11]:
from sklearn.preprocessing import LabelEncoder

# Encode the sentiment labels
le = LabelEncoder()
df['sentiment'] = le.fit_transform(df['sentiment'])

In [12]:
import torch
from torch.utils.data import Dataset

class IMDBDataset(Dataset):
    def __init__(self, reviews, sentiments, vocab, max_length=100):
        self.reviews = reviews
        self.sentiments = sentiments
        self.vocab = vocab
        self.max_length = max_length

    def __len__(self):
        return len(self.reviews)

    def __getitem__(self, idx):
        review = self.reviews[idx]
        sentiment = self.sentiments[idx]

        # Pad or truncate the review to max_length
        if len(review) > self.max_length:
            review = review[:self.max_length]
        else:
            review = review + [self.vocab['<PAD>']] * (self.max_length - len(review))

        return {
            'input_ids': torch.tensor(review, dtype=torch.long),
            'label': torch.tensor(sentiment, dtype=torch.long)
        }

In [13]:
from torch.utils.data import DataLoader, random_split

# Create the dataset
dataset = IMDBDataset(df['tokenized_review'].tolist(), df['sentiment'].tolist(), vocab)

# Split the data into training and testing sets
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Create DataLoaders
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [14]:
for batch in train_dataloader:
    input_tokens = batch['input_ids']
    labels = batch['label']
    print(f"Input Tokens shape: {input_tokens.shape}")
    print(f"Labels shape: {labels.shape}")
    break

Input Tokens shape: torch.Size([32, 100])
Labels shape: torch.Size([32])


In [15]:
# Iterate through the DataLoader
for batch in train_dataloader:
    input_seq = batch['input_ids']
    target_seq = batch['label']
    
    print("Input Sequence:")
    print(input_seq)  # (batch_size, sequence_length)
    
    print("Target Sequence:")
    print(target_seq)  # (batch_size,)
    
    # Example break to print only one batch
    break

token_to_id = dataset.vocab
print(len(token_to_id))
id_to_token = dict(map(reversed, token_to_id.items()))
print(len(id_to_token))

Input Sequence:
tensor([[  535,    36, 66092,  ...,   708,   228,   776],
        [ 5114, 27542, 30346,  ...,     4, 27908,   196],
        [75035,  3219,    22,  ...,   752,    82,   736],
        ...,
        [   23,    24,     2,  ...,    65,   395,   647],
        [   54,  4155,     3,  ...,  3194,    46,     4],
        [  147,   225,   802,  ...,    40,  1072,   170]])
Target Sequence:
tensor([0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0,
        1, 1, 1, 0, 1, 1, 1, 0])
80921
80921


In [16]:
from torch.optim.lr_scheduler import ReduceLROnPlateau
max_len = 100
token_to_id = dataset.vocab
#print(token_to_id)
id_to_token = dict(map(reversed, token_to_id.items()))
print(len(token_to_id))

#dimension_model = 768
dimension_model = 256

transformer_model = DecoderOnlyTransformerBlockTransformer(num_tokens=len(token_to_id), d_model=dimension_model, max_len=max_len)
transformer_model.to(device)
optimizer = Adam(transformer_model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
#criterion = nn.CrossEntropyLoss()
criterion = nn.BCEWithLogitsLoss()

epochs = 5
for epoch in range(epochs):
    transformer_model.train()
    epoch_loss = 0
    total_loss = 0
    for data in train_dataloader:
        optimizer.zero_grad()
        input_tokens = batch['input_ids']
        labels = batch['label']
        input_tokens = input_tokens.to(device)  # Move inputs to GPU if available
        labels = labels.to(device)  # Move labels to GPU if available
        # Debugging: Ausgabe der maximalen und minimalen Werte von input_seq
        #print(f"Input Seq - Max Index: {input_seq.max().item()}, Min Index: {input_seq.min().item()}")
        #print(input_tokens.shape)
        prediction = transformer_model(input_tokens)
        #print(f"Original Prediction shape: {prediction.shape}")
        #print(f"Original Labels shape: {labels.shape}")
        labels = labels.view(-1)  # Flatten to [batch_size * seq_length]
        #print(f"labels views: {labels.shape}")
        #print(f"Labels: {labels}")

        # Flatten prediction and labels
        #prediction = prediction.view(-1, prediction.size(-1))  # [batch_size * seq_length, num_tokens]
        #labels = labels.view(-1)  # [batch_size * seq_length]
        #labels = torch.argmax(labels, dim=-1)
        
        #print(f"Prediction shape: {prediction.shape}")  # Should be [batch_size * seq_length, num_classes]
        #print(f"Labels shape: {labels.shape}")          # Should be [batch_size * seq_length]

        predictions_reduced = torch.mean(prediction, dim=1) 
        #print(f"predictions_reduced: {predictions_reduced.shape}")     

        lable_unsqueezed = labels.unsqueeze(-1).float()
        #print(f"lable_unsqueezed: {lable_unsqueezed.shape}")     
        #loss = criterion(prediction, labels)
        loss = criterion(predictions_reduced, lable_unsqueezed)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * input_tokens.size(0)
        epoch_loss += loss.item()
    scheduler.step(epoch_loss)
    average_loss = total_loss / len(train_dataloader.dataset)
    print(f"Epoch {epoch}, Train Loss: {average_loss}")

80921


  attention_percents = F.softmax(scaled_sims)


Epoch 0, Train Loss: 0.11577490750110768
Epoch 1, Train Loss: 3.115377515473122e-05
Epoch 2, Train Loss: 1.8296417652931227e-05
Epoch 3, Train Loss: 1.2198669145606408e-05
Epoch 4, Train Loss: 8.712462092525472e-06


In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# assuming 'predictions' is your transformer model's output with shape (32, 100, 1)
# and 'labels' is your tensor of labels with shape (32,)

# Define the loss function
criterion = nn.BCEWithLogitsLoss()

# Reduce the sequence dimension using mean or max (depending on your needs)
predictions_reduced = torch.mean(predictions, dim=1)  # or torch.max(predictions, dim=1)[0]

# Apply the loss function
loss = criterion(predictions_reduced, labels.unsqueeze(-1).float())

NameError: name 'predictions' is not defined

In [18]:
transformer_model.eval()
test_loss = 0
correct = 0
total = 0

with torch.no_grad():
    for batch in test_dataloader:
        input_tokens = batch['input_ids']
        labels = batch['label']
        input_tokens = input_tokens.to(device)
        labels = labels.to(device)

        prediction = transformer_model(input_tokens)
        predictions_reduced = torch.mean(prediction, dim=1)
        labels_unsqueezed = labels.unsqueeze(-1).float()

        loss = criterion(predictions_reduced, labels_unsqueezed)
        test_loss += loss.item() * input_tokens.size(0)

        # Calculate accuracy
        predicted_labels = torch.round(torch.sigmoid(predictions_reduced))
        correct += (predicted_labels == labels_unsqueezed).sum().item()
        total += input_tokens.size(0)

average_test_loss = test_loss / len(test_dataloader.dataset)
accuracy = correct / total

print(f"Test Loss: {average_test_loss}")
print(f"Test Accuracy: {accuracy:.4f}")

  attention_percents = F.softmax(scaled_sims)


Test Loss: 2.959506682651799
Test Accuracy: 0.5000


In [150]:
data = [
    {"text": "I love this product!", "label": 1},
    {"text": "This product is terrible.", "label": 0},
    {"text": "I'm so happy with this purchase!", "label": 1},
    {"text": "I regret buying this.", "label": 0},
    {"text": "This is the best thing I've ever bought!", "label": 1},
    {"text": "I'm disappointed with this product.", "label": 0},
    {"text": "I would definitely recommend this!", "label": 1},
    {"text": "This is a waste of money.", "label": 0},
    {"text": "I'm so impressed with this!", "label": 1},
    {"text": "I don't like this at all.", "label": 0}
]

In [151]:
def build_vocab(texts, min_freq=1):
    counter = Counter()
    for text in texts:
        words = text.lower().split()
        counter.update(words)
    # Only keep words with a frequency greater than min_freq
    vocab = {word: idx + 2 for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab['<PAD>'] = 0
    vocab['<UNK>'] = 1
    return vocab

In [152]:
texts = [sample['text'] for sample in data]
vocab = build_vocab(texts)
print(vocab)
token_to_id = dataset.vocab
print(token_to_id)

{'i': 2, 'love': 3, 'this': 4, 'product!': 5, 'product': 6, 'is': 7, 'terrible.': 8, "i'm": 9, 'so': 10, 'happy': 11, 'with': 12, 'purchase!': 13, 'regret': 14, 'buying': 15, 'this.': 16, 'the': 17, 'best': 18, 'thing': 19, "i've": 20, 'ever': 21, 'bought!': 22, 'disappointed': 23, 'product.': 24, 'would': 25, 'definitely': 26, 'recommend': 27, 'this!': 28, 'a': 29, 'waste': 30, 'of': 31, 'money.': 32, 'impressed': 33, "don't": 34, 'like': 35, 'at': 36, 'all.': 37, '<PAD>': 0, '<UNK>': 1}
{'i': 2, 'love': 3, 'this': 4, 'product!': 5, 'product': 6, 'is': 7, 'terrible.': 8, "i'm": 9, 'so': 10, 'happy': 11, 'with': 12, 'purchase!': 13, 'regret': 14, 'buying': 15, 'this.': 16, 'the': 17, 'best': 18, 'thing': 19, "i've": 20, 'ever': 21, 'bought!': 22, 'disappointed': 23, 'product.': 24, 'would': 25, 'definitely': 26, 'recommend': 27, 'this!': 28, 'a': 29, 'waste': 30, 'of': 31, 'money.': 32, 'impressed': 33, "don't": 34, 'like': 35, 'at': 36, 'all.': 37, '<PAD>': 0, '<UNK>': 1}


In [153]:
import torch
from torch.utils.data import Dataset

class SimpleDataset(Dataset):
    def __init__(self, data, vocab):
        self.data = data
        self.vocab = vocab

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data[idx]['text']
        label = self.data[idx]['label']

        # Convert text to indices
        indices = [self.vocab.get(word, self.vocab['<UNK>']) for word in text.lower().split()]

        # Convert indices to tensor
        input_ids = torch.tensor(indices)

        # Convert label to tensor
        label = torch.tensor(label)

        return {
            'input_ids': input_ids,
            'label': label
        }

In [None]:
dataset = SimpleDataset(data, vocab)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=2, shuffle=True)
for data in dataloader:
    input_tokens = data['input_ids']
    labels = data['label']
    print(input_tokens)
    print(labels)

In [None]:
from torch.optim.lr_scheduler import ReduceLROnPlateau
max_len = 5
token_to_id = dataset.vocab
#print(token_to_id)
id_to_token = dict(map(reversed, token_to_id.items()))
print(len(token_to_id))

#dimension_model = 768
dimension_model = 256

transformer_model = DecoderOnlyTransformerBlockTransformer(num_tokens=len(token_to_id), d_model=dimension_model, max_len=max_len)
transformer_model.to(device)
optimizer = Adam(transformer_model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
#criterion = nn.CrossEntropyLoss()
criterion = nn.BCEWithLogitsLoss()

epochs = 5
for epoch in range(epochs):
    transformer_model.train()
    epoch_loss = 0
    total_loss = 0
    for data in dataloader:
        optimizer.zero_grad()
        input_tokens = data['input_ids']
        labels = data['label']
        input_tokens = input_tokens.to(device)  # Move inputs to GPU if available
        labels = labels.to(device)  # Move labels to GPU if available
        # Debugging: Ausgabe der maximalen und minimalen Werte von input_seq
        #print(f"Input Seq - Max Index: {input_seq.max().item()}, Min Index: {input_seq.min().item()}")
        #print(input_tokens.shape)
        prediction = transformer_model(input_tokens)
        labels = labels.view(-1)  # Flatten to [batch_size * seq_length]
        predictions_reduced = torch.mean(prediction, dim=1)   
        lable_unsqueezed = labels.unsqueeze(-1).float()
        #print(f"lable_unsqueezed: {lable_unsqueezed.shape}")     
        loss = criterion(predictions_reduced, lable_unsqueezed)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * input_tokens.size(0)
        epoch_loss += loss.item()
    scheduler.step(epoch_loss)
    average_loss = total_loss / len(dataloader.dataset)
    print(f"Epoch {epoch}, Train Loss: {average_loss}")

## Using simple data for sentiment analysis

In [155]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader

In [156]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device = torch.device("cpu")
device

device(type='cuda')

In [157]:
token_to_id = {'what' : 0 ,'is' : 1 ,'statquest' : 2 ,'awesome' : 3 ,'<EOS>' : 4}
id_to_token = dict(map(reversed, token_to_id.items()))
inputs = torch.tensor([[token_to_id['what'],token_to_id['is'], token_to_id['statquest'], 
                        token_to_id['<EOS>'], token_to_id['awesome']],
                       [token_to_id['statquest'],token_to_id['is'], token_to_id['what'], 
                        token_to_id['<EOS>'], token_to_id['awesome']]])
labels = torch.tensor([[token_to_id['is'],token_to_id['statquest'], token_to_id['<EOS>'], 
                        token_to_id['awesome'], token_to_id['<EOS>']],
                       [token_to_id['is'],token_to_id['what'], token_to_id['<EOS>'], 
                        token_to_id['awesome'], token_to_id['<EOS>']]])
dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)
inputs

tensor([[0, 1, 2, 4, 3],
        [2, 1, 0, 4, 3]])

In [158]:
labels = torch.tensor([[token_to_id['is'],token_to_id['statquest'], token_to_id['<EOS>'], 
                        token_to_id['awesome'], token_to_id['<EOS>']],
                       [token_to_id['is'],token_to_id['what'], token_to_id['<EOS>'], 
                        token_to_id['awesome'], token_to_id['<EOS>']]])
labels

tensor([[1, 2, 4, 3, 4],
        [1, 0, 4, 3, 4]])

In [159]:
dataset = TensorDataset(inputs, labels)
dataset

<torch.utils.data.dataset.TensorDataset at 0x212cb109be0>

In [160]:
dataloader = DataLoader(dataset)
dataloader

<torch.utils.data.dataloader.DataLoader at 0x212cb109460>

In [231]:
data = [
    {"text": "I love this product!", "label": 1},
    {"text": "This product is terrible.", "label": 0},
    {"text": "I'm so happy with this purchase!", "label": 1},
    {"text": "I regret buying this.", "label": 0},
    {"text": "This is the best thing I've ever bought!", "label": 1},
    {"text": "I'm disappointed with this product.", "label": 0},
    {"text": "I would definitely recommend this!", "label": 1},
    {"text": "This is a waste of money.", "label": 0},
    {"text": "I'm so impressed with this!", "label": 1},
    {"text": "I don't like this at all.", "label": 0}
]

In [232]:
def build_vocab(texts, min_freq=1):
    counter = Counter()
    for text in texts:
        words = text.lower().split()
        counter.update(words)
    # Only keep words with a frequency greater than min_freq
    vocab = {word: idx + 2 for idx, (word, freq) in enumerate(counter.items()) if freq >= min_freq}
    vocab['<PAD>'] = 0
    vocab['<UNK>'] = 1
    return vocab

In [233]:
texts = [sample['text'] for sample in data]
vocab = build_vocab(texts)
print(vocab)

{'i': 2, 'love': 3, 'this': 4, 'product!': 5, 'product': 6, 'is': 7, 'terrible.': 8, "i'm": 9, 'so': 10, 'happy': 11, 'with': 12, 'purchase!': 13, 'regret': 14, 'buying': 15, 'this.': 16, 'the': 17, 'best': 18, 'thing': 19, "i've": 20, 'ever': 21, 'bought!': 22, 'disappointed': 23, 'product.': 24, 'would': 25, 'definitely': 26, 'recommend': 27, 'this!': 28, 'a': 29, 'waste': 30, 'of': 31, 'money.': 32, 'impressed': 33, "don't": 34, 'like': 35, 'at': 36, 'all.': 37, '<PAD>': 0, '<UNK>': 1}


In [234]:
import torch
from torch.utils.data import Dataset

class SimpleDataset(Dataset):
    def __init__(self, data, vocab, min_len):
        self.data = data
        self.vocab = vocab
        self.min_len = min_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        #print(f"idx: {idx}")
        #print(f"self.data: {self.data}")
        text = self.data[idx]['text']
        label = self.data[idx]['label']

        # Convert text to indices
        indices = [self.vocab.get(word, self.vocab['<UNK>']) for word in text.lower().split()]

        if len(indices) < self.min_len:
            indices += [0] * (self.min_len - len(indices))  # Use 0 instead of self.vocab['<PAD>']

        # Convert indices to tensor
        input_ids = torch.tensor(indices)

        # Convert label to tensor
        label = torch.tensor(label)

        return {
            'input_ids': input_ids,
            'label': label
        }

In [235]:
dataset = SimpleDataset(data, vocab, 10)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=True)
token_to_id = dataset.vocab
print(token_to_id)
for data in dataloader:
    input_tokens = data['input_ids']
    labels = data['label']
    #print(input_tokens)
    #print(labels)
    break

{'i': 2, 'love': 3, 'this': 4, 'product!': 5, 'product': 6, 'is': 7, 'terrible.': 8, "i'm": 9, 'so': 10, 'happy': 11, 'with': 12, 'purchase!': 13, 'regret': 14, 'buying': 15, 'this.': 16, 'the': 17, 'best': 18, 'thing': 19, "i've": 20, 'ever': 21, 'bought!': 22, 'disappointed': 23, 'product.': 24, 'would': 25, 'definitely': 26, 'recommend': 27, 'this!': 28, 'a': 29, 'waste': 30, 'of': 31, 'money.': 32, 'impressed': 33, "don't": 34, 'like': 35, 'at': 36, 'all.': 37, '<PAD>': 0, '<UNK>': 1}


In [236]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(start=0, end=max_len, step=1).float().unsqueeze(1)
        #print(position)
        embedding_index = torch.arange(start=0, end=d_model, step=2).float()
        #print(embedding_index)
        div_term = 1 / torch.tensor(10000.0)**(embedding_index / d_model)
        #print(div_term)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        #print(pe)
        self.register_buffer('pe',pe)

    def forward(self, word_embeddings):
        #print("pe shape")
        #print(self.pe.shape)
        #print(self.pe[:word_embeddings.size(0), :].shape)
        #print("word embeddings shape")
        #print(word_embeddings.shape)
        return word_embeddings + self.pe[:word_embeddings.size(0), :]

In [237]:
positionalEncoding = PositionalEncoding(d_model=2, max_len=10)
positionalEncoding

PositionalEncoding()

In [238]:
class Attention(nn.Module):
    def __init__(self, d_model=2):
        super().__init__()
        self.W_q = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_k = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_v = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.row_dim = 0
        self.col_dim = 1

    def forward(self, encodings, mask=None):
        q = self.W_q(encodings)
        k = self.W_k(encodings)
        v = self.W_v(encodings)

        # Ensure k has the same shape as q before transpose
        assert k.shape == q.shape
        
        # Transpose k to align with q for dot product
        k_transposed = k.transpose(-1, -2)
        
        # Check shapes
        #print("Shape of q:", q.shape)  # [1, 5, 2]
        #print(q)
        #print("Shape of k_transposed:", k_transposed.shape)  # [1, 2, 5]
        #print(k_transposed)
        sims = torch.matmul(q, k_transposed)
        scaled_sims = sims / torch.tensor(k.size(1)**0.5)

        if mask is not None:
            #mask = mask.to(device)
            scaled_sims = scaled_sims.masked_fill(mask=mask, value=-1e9)

        attention_percents = F.softmax(scaled_sims)

        attention_scores = torch.matmul(attention_percents, v)

        return attention_scores

In [239]:
attention = Attention(d_model=2)
attention

Attention(
  (W_q): Linear(in_features=2, out_features=2, bias=False)
  (W_k): Linear(in_features=2, out_features=2, bias=False)
  (W_v): Linear(in_features=2, out_features=2, bias=False)
)

In [245]:
class DecoderOnlyTransformer(nn.Module):

    def __init__(self, num_tokens, d_model, max_len, using_mask=True):
        super().__init__()
        self.we = nn.Embedding(num_embeddings=num_tokens, embedding_dim=d_model)
        self.pe = PositionalEncoding(d_model=d_model, max_len=max_len)
        self.self_attention = Attention(d_model=d_model)
        self.fc_layer = nn.Linear(in_features=d_model, out_features=1)
        self.loss = nn.CrossEntropyLoss()
        self.using_mask = using_mask

    def forward(self, token_ids):
        # calculate word embeddings out of the tokens
        word_embeddings = self.we(token_ids)
        #print(word_embeddings)
        # apply positional encoding (using the PositionalEncoder layer) to the word embeddings
        position_encoded = self.pe(word_embeddings)
        #print(position_encoded)
        # create mask for decoder only transformer so it can not cheat
        if (self.using_mask == True):
            mask_ones = torch.ones((token_ids.size(dim=0), token_ids.size(dim=0)))
            #print(mask_ones)
            mask = torch.tril(mask_ones)
            #print(mask)
            mask = mask == 0
            #print(mask)
            # calculate self attention with the Attention Layer
            self_attention_values = self.self_attention(position_encoded, mask=mask)
        else:
            self_attention_values = self.self_attention(position_encoded)    
        # add original position_encoded values to the calculated self attention values (residual connection)
        residual_connection_values = position_encoded + self_attention_values
        # use the final linear layer to calculate the output probabilities
        return self.fc_layer(residual_connection_values)
        #fc_layer_output = self.fc_layer(residual_connection_values)
        #return fc_layer_output

In [None]:
max_len = 10
transformer_model = DecoderOnlyTransformer(num_tokens=len(token_to_id), d_model=2, max_len=max_len, using_mask=False)
optimizer = Adam(transformer_model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

epochs = 20
for epoch in range(epochs):
    epoch_loss = 0
    total_loss = 0
    for data in dataloader:
        optimizer.zero_grad()
        input_tokens = data['input_ids']
        labels = data['label']
        reduced_tensor = input_tokens.squeeze()
        prediction = transformer_model(reduced_tensor)

        prediction = prediction.view(-1, prediction.size(-1))  # [batch_size * seq_length, num_tokens]
        labels = labels.view(-1)  # [batch_size * seq_length
        print(prediction)
        print(labels)
        loss = criterion(prediction, labels)  
        
        #reduced_tensor = prediction.view(-1)
        #reduced_tensor = prediction.mean(dim=0)
        #prediction = prediction.view(-1, prediction.size(-1))  # [batch_size * seq_length, num_tokens]
        #labels = labels.view(-1).float()  # [batch_size * seq_length
        #print(reduced_tensor)
        #print(labels)
        #loss = criterion(reduced_tensor, labels)        
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * input_tokens.size(0)
        epoch_loss += loss.item()
    average_loss = total_loss / len(dataloader.dataset)
    print(f"Epoch {epoch}, Train Loss: {average_loss}")

In [367]:
class SentimentModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.fc1 = nn.Linear(embedding_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, text):
        embedding = self.embedding(text)
        layer1_output = self.fc1(embedding)
        #print(layer1_output.shape)
        pooled_output = layer1_output.mean(dim=1) 
        #print(pooled_output.shape)
        dense_outputs = self.fc(pooled_output)
        #output = F.log_softmax(dense_outputs, dim=1)
        return dense_outputs
        #print(output)
        #return dense_outputs
        # Apply a softmax for classification
        #output = F.log_softmax(dense_outputs, dim=1)
        #return output      

In [368]:
class SentimentRnnModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, text):
        embedding = self.embedding(text)
        rnn_output, (hidden, cell) = self.rnn(embedding)
        # hidden: [1, batch size, hidden dim] -> [batch size, hidden dim]
        hidden = hidden[-1,:,:]
        # Pass through the fully connected layer
        print(hidden.shape)
        pooled_output = hidden.mean(dim=1) 
        print(pooled_output)
        dense_outputs = self.fc(pooled_output)
        #dense_outputs = self.fc(hidden)
        # Apply a softmax for classification
        #output = F.log_softmax(dense_outputs, dim=1)
        return dense_outputs

In [381]:
class SentimentLSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, text):
        embedding = self.embedding(text)
        _, (hidden, _) = self.rnn(embedding)
        #print(hidden.shape)
        output = self.fc(hidden.squeeze(0))
        return output

In [398]:
class SentimentGRUModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.GRU(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, text):
        embedding = self.embedding(text)
        _, hidden = self.rnn(embedding)
        #print(hidden.shape)
        output = self.fc(hidden.squeeze(0))
        return output

In [403]:
vocab_size = len(dataset.vocab)
print(vocab_size)
embedding_dim = 100
hidden_dim = 256
output_dim = 1

sentiment_model = SentimentModel(vocab_size, embedding_dim, hidden_dim, output_dim)
#sentiment_model = SentimentRnnModel(vocab_size, embedding_dim, hidden_dim, output_dim)
#sentiment_model = SentimentLSTMModel(vocab_size, embedding_dim, hidden_dim, output_dim)
sentiment_model = SentimentGRUModel(vocab_size, embedding_dim, hidden_dim, output_dim)
optimizer = Adam(sentiment_model.parameters(), lr=0.01)
criterion = nn.BCEWithLogitsLoss()
sentiment_model.train()
epochs = 10

for epoch in range(epochs):
    epoch_loss = 0
    total_loss = 0
    for data in dataloader:
        optimizer.zero_grad()
        input_tokens = data['input_ids']
        #print(f"input_ids: {input_tokens}")
        labels = data['label']
        prediction = sentiment_model(input_tokens).squeeze(1)
        labels = labels.view(-1).float()  # [batch_size * seq_length
        #print(prediction)
        #print(labels)
        loss = criterion(prediction, labels) 
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * input_tokens.size(0)
        epoch_loss += loss.item()
    average_loss = total_loss / len(dataloader.dataset)
    print(f"Epoch {epoch}, Train Loss: {average_loss}")

38
Epoch 0, Train Loss: 1.4191549837589263
Epoch 1, Train Loss: 0.7171775668859481
Epoch 2, Train Loss: 0.1341601110994816
Epoch 3, Train Loss: 0.01307742353528738
Epoch 4, Train Loss: 0.0027392345247790216
Epoch 5, Train Loss: 0.001157200790476054
Epoch 6, Train Loss: 0.0007865868101362139
Epoch 7, Train Loss: 0.0006510086066555231
Epoch 8, Train Loss: 0.0005742598848883062
Epoch 9, Train Loss: 0.0005227021523751318


In [407]:
test_datas = [
    {"text": "I love this product!", "label": 1},
    {"text": "This product is terrible.", "label": 0},
    {"text": "I'm so happy with this purchase!", "label": 1},
    {"text": "I regret buying this.", "label": 0},
    {"text": "This is the best thing I've ever bought!", "label": 1},
    {"text": "I'm disappointed with this product.", "label": 0},
    {"text": "I would definitely recommend this!", "label": 1},
    {"text": "This is a waste of money.", "label": 0},
    {"text": "I'm so impressed with this!", "label": 1},
    {"text": "I don't like this at all.", "label": 0},
    {"text": "I don't like it.", "label": 0},
    {"text": "I love this.", "label": 1},
]

In [408]:
test_texts = [sample['text'] for sample in test_datas]
print(test_texts)

['I love this product!', 'This product is terrible.', "I'm so happy with this purchase!", 'I regret buying this.', "This is the best thing I've ever bought!", "I'm disappointed with this product.", 'I would definitely recommend this!', 'This is a waste of money.', "I'm so impressed with this!", "I don't like this at all.", "I don't like it.", 'I love this.']


In [409]:
min_len = 10
for test_data in test_datas:
    #print(test_text)
    test_input_text = test_data['text']
    test_label = test_data['label']
    #print(test_input_text)
    #print(test_label)
    test_indices = [vocab.get(word, vocab['<UNK>']) for word in test_input_text.lower().split()]
    if len(test_indices) < min_len:
        test_indices += [0] * (min_len - len(test_indices))  # Use 0 instead of self.vocab['<PAD>']
    test_input_ids = torch.tensor(test_indices)

    # Convert label to tensor
    test_label = torch.tensor(test_label)
    #print(test_input_ids)
    #print(test_label)

    # Adding a new dimension at the beginning to change the shape to [1, 10]
    reshaped_test_input_ids = test_input_ids.unsqueeze(0)

    #print(f"Original Tensor shape: {test_input_ids.shape}")  # Output: torch.Size([10])
    #print(f"Reshaped Tensor shape: {reshaped_test_input_ids.shape}")  # Output: torch.Size([1, 10])
    #print(reshaped_test_input_ids)
    
    test_prediction = sentiment_model(reshaped_test_input_ids).squeeze(1)
    threshold = 0
    binary_prediction = (test_prediction >= threshold).long()
    print(f"{test_input_text} - test_label: {test_label}, binary predition: {binary_prediction}, test_prediction: {test_prediction}")

I love this product! - test_label: 1, binary predition: tensor([1]), test_prediction: tensor([7.5183], grad_fn=<SqueezeBackward1>)
This product is terrible. - test_label: 0, binary predition: tensor([0]), test_prediction: tensor([-7.7729], grad_fn=<SqueezeBackward1>)
I'm so happy with this purchase! - test_label: 1, binary predition: tensor([1]), test_prediction: tensor([7.4441], grad_fn=<SqueezeBackward1>)
I regret buying this. - test_label: 0, binary predition: tensor([0]), test_prediction: tensor([-8.0550], grad_fn=<SqueezeBackward1>)
This is the best thing I've ever bought! - test_label: 1, binary predition: tensor([1]), test_prediction: tensor([7.2929], grad_fn=<SqueezeBackward1>)
I'm disappointed with this product. - test_label: 0, binary predition: tensor([0]), test_prediction: tensor([-7.2352], grad_fn=<SqueezeBackward1>)
I would definitely recommend this! - test_label: 1, binary predition: tensor([1]), test_prediction: tensor([7.5731], grad_fn=<SqueezeBackward1>)
This is a was

In [None]:
test_input_tokens = 
prediction = sentiment_model(test_input_tokens).squeeze(1)
print(prediction)