In [27]:
import re
import random
import torch
import torch.nn as nn
from torch.utils.data import random_split, Dataset, DataLoader
from collections import Counter

In [28]:
class Tokenizer:
  def __init__(self, vocab):
    self.str_to_int = vocab
    self.int_to_str = {idx: word for word, idx in vocab.items()}

  def encode(self,text):
    return [self.str_to_int[token] for token in re.split(r'([,.:;?_!"()\']|--|\s)', text) if token.strip() != '']

  def decode(self,token_ids):
    return [self.int_to_str[id] for id in token_ids]

In [29]:
class SelfAttention(nn.Module):
  def __init__(self,d_in,d_out):
    super().__init__()
    self.d_out=d_out
    self.d_in=d_in


    self.w_k=nn.Linear(d_in,d_out)
    self.w_q=nn.Linear(d_in,d_out)
    self.w_v=nn.Linear(d_in,d_out)

  def forward(self,x):
    keys=self.w_k(x)
    values=self.w_v(x)
    queries=self.w_q(x)

    attn_scores=queries @ keys.transpose(-1, -2)
    scaled_weight=torch.softmax(attn_scores/torch.sqrt(torch.tensor(self.d_out)),dim=-1)
    attn_matrix=scaled_weight @ values
    return attn_matrix

In [30]:
class MultiHeadAttention(nn.Module):
  def __init__(self,num_heads,d_in,d_out):
    super().__init__()
    self.heads=nn.ModuleList(
        [SelfAttention(d_in,d_out) for _ in range(num_heads)]
    )
  def forward(self,x):
    return torch.cat([head(x) for head in self.heads],dim=-1)

In [31]:
class GELU(nn.Module):
  def __init__(self):
    super().__init__()

  def forward(self,x):
    return 0.5*x*(1+torch.tanh(torch.sqrt(torch.tensor(2/torch.pi))*(x+0.044715*x**3)))

In [32]:
class FeedForward(nn.Module):
  def __init__(self,emb_dim):
    super().__init__()
    self.layers=nn.Sequential(
        nn.Linear(emb_dim,4*emb_dim),
        GELU(),
        nn.Linear(4*emb_dim,emb_dim)
    )

  def forward(self,x):
    return self.layers(x)

In [33]:
class LayerNormalization(nn.Module):
  def __init__(self,emb_dim):
    super().__init__()
    self.scale = nn.Parameter(torch.ones(emb_dim))
    self.shift = nn.Parameter(torch.zeros(emb_dim))
    self.eps=1e-5

  def forward(self,x):
    mean=torch.mean(x,dim=-1,keepdim=True)
    variance=torch.var(x,dim=-1,keepdim=True, unbiased=False)
    norm=(x-mean)/torch.sqrt(variance+self.eps)
    return self.scale*norm+self.shift

In [34]:
class BERTEmbedding(nn.Module):
    def __init__(self, vocab_size, emb_dim, segment_token_type=2, max_token=512, dropout_prob=0.1):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, emb_dim)
        self.segment_embedding = nn.Embedding(segment_token_type, emb_dim)
        self.position_embedding = nn.Embedding(max_token, emb_dim)
        self.layer_norm = LayerNormalization(emb_dim)
        self.dropout = nn.Dropout(dropout_prob)
        self.register_buffer('position_ids', torch.arange(max_token).unsqueeze(0))
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
        elif isinstance(module, LayerNormalization):
            nn.init.ones_(module.scale)
            nn.init.zeros_(module.shift)

    def forward(self, x, segment_ids=None, position_ids=None):
        batch_size, seq_length = x.shape
        token_embeds = self.token_embedding(x)

        if position_ids is None:
            position_ids = self.position_ids[:, :seq_length]
        position_embeds = self.position_embedding(position_ids)

        if segment_ids is None:
            segment_ids = torch.zeros_like(x, dtype=torch.long)
        segment_embeds = self.segment_embedding(segment_ids)

        embeddings = token_embeds + segment_embeds + position_embeds

        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [35]:
class BERTEncoder(nn.Module):
    def __init__(self, emb_dim, num_heads, dropout_prob=0.1):
        super().__init__()

        self.emb_dim = emb_dim
        self.num_heads = num_heads

        self.multihead_attention = MultiHeadAttention(num_heads, emb_dim, emb_dim // num_heads)
        self.layer_normalization = LayerNormalization(emb_dim)
        self.feed_forward = FeedForward(emb_dim)
        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, x):
        residual = x
        attention_weight = self.multihead_attention(x)
        x = self.layer_normalization(residual + self.dropout(attention_weight))
        residual = x
        ff_output = self.feed_forward(x)
        output = self.layer_normalization(residual + self.dropout(ff_output))
        return output

In [36]:
class CustomSentimentDataset(Dataset):
  def __init__(self, texts, labels, tokenizer, max_length=128):
    super().__init__()
    self.texts = texts
    self.labels = labels
    self.tokenizer = tokenizer
    self.max_length = max_length

  def __len__(self):
    return len(self.texts)

  def __getitem__(self, idx):
    text = self.texts[idx]
    label = self.labels[idx]

    tokens = self.tokenizer.encode(text)

    if len(tokens) > self.max_length - 2:
        tokens = tokens[:self.max_length - 2]

    input_ids = (
        [self.tokenizer.str_to_int['[CLS]']] +
        tokens +
        [self.tokenizer.str_to_int['[SEP]']]
    )

    padding_length = self.max_length - len(input_ids)
    input_ids = input_ids + [self.tokenizer.str_to_int['[PAD]']] * padding_length
    segment_ids = [0] * self.max_length

    return {
        'input_ids': torch.tensor(input_ids, dtype=torch.long),
        'segment_ids': torch.tensor(segment_ids, dtype=torch.long),
        'label': torch.tensor(label, dtype=torch.long)
    }

In [37]:
def build_vocab_from_custom_data(texts, vocab_size=10000):
    token_counter = Counter()

    for text in texts:
        tokens = re.split(r'([,.:;?_!"()\']|--|\s)', text)
        tokens = [token for token in tokens if token.strip() != '']
        token_counter.update(tokens)

    print(f"Total unique tokens found: {len(token_counter)}")

    special_tokens = ['[PAD]', '[UNK]', '[CLS]', '[SEP]', '[MASK]']
    vocab = {}

    for idx, token in enumerate(special_tokens):
        vocab[token] = idx

    for idx, (token, count) in enumerate(token_counter.most_common(vocab_size - len(special_tokens))):
        vocab[token] = idx + len(special_tokens)

    print(f"Final vocabulary size: {len(vocab)}")
    print(vocab)

    return vocab

In [38]:
class SentimentModel(nn.Module):
  def __init__(self, vocab_size, num_classes, emb_dim, num_heads=4, dropout_prob=0.1):
    super().__init__()
    self.embedding = BERTEmbedding(vocab_size, emb_dim, dropout_prob=dropout_prob)
    self.encoder = BERTEncoder(emb_dim, num_heads, dropout_prob=dropout_prob)
    self.classifier = nn.Linear(emb_dim, num_classes)

  def forward(self, input_ids, segment_ids=None):
    embeddings = self.embedding(input_ids, segment_ids)
    encoder_output = self.encoder(embeddings)
    cls_output = encoder_output[:,0,:]
    logits = self.classifier(cls_output)
    return logits

In [39]:
sample_data = [
    # Positive reviews
    ("This product is amazing! I love it.", 1),
    ("Excellent quality and fast delivery.", 1),
    ("Highly recommended, great value for money.", 1),
    ("Works perfectly, very satisfied with purchase.", 1),
    ("Outstanding service and product quality.", 1),
    ("Best purchase I've made this year!", 1),
    ("Fantastic product, exceeded my expectations.", 1),
    ("Great features and easy to use.", 1),
    ("Very happy with this product.", 1),
    ("Perfect! Exactly what I needed.", 1),

    # Negative reviews
    ("Terrible product, complete waste of money.", 0),
    ("Poor quality and stopped working after 2 days.", 0),
    ("Very disappointed with this purchase.", 0),
    ("Doesn't work as described, avoid this product.", 0),
    ("Worst product I've ever bought.", 0),
    ("Broken upon arrival, terrible quality.", 0),
    ("Not worth the money, very poor performance.", 0),
    ("Complete garbage, don't buy this.", 0),
    ("Extremely disappointed, faulty product.", 0),
    ("Awful experience, product doesn't work.", 0),

    # Neutral reviews
    ("The product is okay, nothing special.", 2),
    ("It works but could be better.", 2),
    ("Average product, does the job.", 2),
    ("Not bad, but not great either.", 2),
    ("Mediocre quality, expected more.", 2),
]

# Separate texts and labels
texts = [item[0] for item in sample_data]
labels = [item[1] for item in sample_data]



In [40]:
device= torch.device('cuda' if torch.cuda.is_available() else 'cpu')
epochs=100
num_classes = len(set(labels))
vocab = build_vocab_from_custom_data(texts, vocab_size=5000)
tokenizer = Tokenizer(vocab)

Total unique tokens found: 107
Final vocabulary size: 112
{'[PAD]': 0, '[UNK]': 1, '[CLS]': 2, '[SEP]': 3, '[MASK]': 4, '.': 5, ',': 6, 'product': 7, 'quality': 8, "'": 9, 'this': 10, 'I': 11, 'and': 12, '!': 13, 'money': 14, 'with': 15, 'purchase': 16, 't': 17, 'is': 18, 'great': 19, 'very': 20, 've': 21, 'Very': 22, 'disappointed': 23, 'work': 24, 'Not': 25, 'the': 26, 'but': 27, 'This': 28, 'amazing': 29, 'love': 30, 'it': 31, 'Excellent': 32, 'fast': 33, 'delivery': 34, 'Highly': 35, 'recommended': 36, 'value': 37, 'for': 38, 'Works': 39, 'perfectly': 40, 'satisfied': 41, 'Outstanding': 42, 'service': 43, 'Best': 44, 'made': 45, 'year': 46, 'Fantastic': 47, 'exceeded': 48, 'my': 49, 'expectations': 50, 'Great': 51, 'features': 52, 'easy': 53, 'to': 54, 'use': 55, 'happy': 56, 'Perfect': 57, 'Exactly': 58, 'what': 59, 'needed': 60, 'Terrible': 61, 'complete': 62, 'waste': 63, 'of': 64, 'Poor': 65, 'stopped': 66, 'working': 67, 'after': 68, '2': 69, 'days': 70, 'Doesn': 71, 'as': 72,

In [41]:
def evaluate_model(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            segment_ids = batch['segment_ids'].to(device)
            labels = batch['label'].to(device)

            logits = model(input_ids, segment_ids)
            _, predicted = torch.max(logits, 1)

            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    model.train()
    return 100 * correct / total if total > 0 else 0

In [42]:
dataset = CustomSentimentDataset(texts, labels, tokenizer)

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

model = SentimentModel(
    vocab_size=len(vocab),
    num_classes=num_classes,
    emb_dim=128,
    num_heads=4,
    dropout_prob=0.1
).to(device)

print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)

model.train()
best_accuracy = 0

for epoch in range(epochs):
    total_loss = 0
    correct = 0
    total = 0

    for batch_idx, batch in enumerate(train_loader):
        input_ids = batch['input_ids'].to(device)
        segment_ids = batch['segment_ids'].to(device)
        labels_batch = batch['label'].to(device)

        logits = model(input_ids, segment_ids)
        loss = criterion(logits, labels_batch)

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        total_loss += loss.item()

        _, predicted = torch.max(logits, 1)
        correct += (predicted == labels_batch).sum().item()
        total += labels_batch.size(0)

        if batch_idx % 5 == 0:
            batch_accuracy = 100 * (predicted == labels_batch).sum().item() / labels_batch.size(0)
            print(f'Epoch {epoch+1}, Batch {batch_idx}, Loss: {loss.item():.4f}, Acc: {batch_accuracy:.1f}%')

    epoch_accuracy = 100 * correct / total
    avg_loss = total_loss / len(train_loader)
    val_accuracy = evaluate_model(model, val_loader, device)

    print(f'\nEpoch [{epoch+1}/{epochs}] completed:')
    print(f'  Training Loss: {avg_loss:.4f}')
    print(f'  Training Accuracy: {epoch_accuracy:.2f}%')
    print(f'  Validation Accuracy: {val_accuracy:.2f}%')


    if val_accuracy > best_accuracy:
        best_accuracy = val_accuracy
        torch.save({
            'model_state_dict': model.state_dict(),
            'tokenizer_vocab': tokenizer.str_to_int,
            'vocab_size': len(vocab),
            'num_classes': num_classes,
            'label_mapping': {
                0: 'Negative',
                1: 'Positive',
                2: 'Neutral'
            }
        }, 'best_custom_model.pth')
        print(f'New best model saved! Validation Accuracy: {val_accuracy:.2f}%')

    print('-' * 60)

print(f'Training completed! Best validation accuracy: {best_accuracy:.2f}%')




Model parameters: 262,275
Epoch 1, Batch 0, Loss: 1.1442, Acc: 25.0%

Epoch [1/100] completed:
  Training Loss: 1.2051
  Training Accuracy: 20.00%
  Validation Accuracy: 40.00%
New best model saved! Validation Accuracy: 40.00%
------------------------------------------------------------
Epoch 2, Batch 0, Loss: 1.1906, Acc: 37.5%

Epoch [2/100] completed:
  Training Loss: 1.1359
  Training Accuracy: 30.00%
  Validation Accuracy: 40.00%
------------------------------------------------------------
Epoch 3, Batch 0, Loss: 1.0590, Acc: 50.0%

Epoch [3/100] completed:
  Training Loss: 1.0244
  Training Accuracy: 35.00%
  Validation Accuracy: 40.00%
------------------------------------------------------------
Epoch 4, Batch 0, Loss: 1.0638, Acc: 62.5%

Epoch [4/100] completed:
  Training Loss: 1.0311
  Training Accuracy: 65.00%
  Validation Accuracy: 40.00%
------------------------------------------------------------
Epoch 5, Batch 0, Loss: 0.9419, Acc: 62.5%

Epoch [5/100] completed:
  Train

In [47]:
best_model= torch.load('best_custom_model.pth')
model.load_state_dict(best_model['model_state_dict'])
model.eval()

sample_review="This product is bad."
input_ids = tokenizer.encode(sample_review)
input_ids = [tokenizer.str_to_int['[CLS]']] + input_ids + [tokenizer.str_to_int['[SEP]']]
padding_length = 128 - len(input_ids)
input_ids = input_ids + [tokenizer.str_to_int['[PAD]']] * padding_length
input_tensor = torch.tensor([input_ids], dtype=torch.long).to(device)

with torch.no_grad():
    logits = model(input_tensor)
    _, predicted = torch.max(logits, 1)
    sentiment = best_model['label_mapping'][predicted.item()]
    print(f'Review: "{sample_review}"')
    print(f'Predicted Sentiment: {sentiment}')

Review: "This product is bad."
Predicted Sentiment: Positive


  best_model= torch.load('best_custom_model.pth')
