In [None]:
%pip install transformers

In [1]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from transformers import DistilBertTokenizer
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
import numpy as np

In [2]:
print(torch.version.cuda)

11.8


In [3]:
# check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [5]:
def download_extract_imdb(base_dir='./data'):
    """
    Download and extract the IMDB dataset

    Parameters:
    base_dir (str): Base directory to store the dataset

    Returns:
    str: Path to the extracted dataset
    """
    # Create data directory if it doesn't exist
    os.makedirs(base_dir, exist_ok=True)

    # Dataset URL and expected SHA-1 hash
    url = "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"
    filename = os.path.join(base_dir, "aclImdb_v1.tar.gz")
    expected_sha1 = '01ada507287d82875905620988597833ad4e0903'

    # Download the file if it doesn't exist or has wrong hash
    if not os.path.exists(filename) or hashlib.sha1(open(filename, 'rb').read()).hexdigest() != expected_sha1:
        print(f"Downloading IMDB dataset from {url}...")
        urllib.request.urlretrieve(url, filename)

        # Verify downloaded file
        sha1 = hashlib.sha1(open(filename, 'rb').read()).hexdigest()
        if sha1 != expected_sha1:
            raise ValueError(f"Downloaded file has incorrect SHA-1 hash. Expected {expected_sha1}, got {sha1}")

    # Extract the dataset if not already extracted
    extract_path = os.path.join(base_dir, 'aclImdb')
    if not os.path.exists(extract_path):
        print("Extracting dataset...")
        with tarfile.open(filename, 'r:gz') as tar:
            def is_within_directory(directory, target):
                abs_directory = os.path.abspath(directory)
                abs_target = os.path.abspath(target)
                prefix = os.path.commonprefix([abs_directory, abs_target])
                return prefix == abs_directory

            def safe_extract(tar, path):
                for member in tar.getmembers():
                    member_path = os.path.join(path, member.name)
                    if not is_within_directory(path, member_path):
                        raise Exception("Attempted Path Traversal in Tar File")

                tar.extractall(path)

            safe_extract(tar, base_dir)
        print("Extraction complete!")

    return extract_path

if __name__ == "__main__":
    try:
        data_dir = download_extract_imdb()
        print(f"\nDataset downloaded and extracted to: {data_dir}")

        # Print dataset structure
        print("\nDataset structure:")
        for root, dirs, files in os.walk(data_dir):
            level = root.replace(data_dir, '').count(os.sep)
            indent = ' ' * 4 * level
            print(f"{indent}{os.path.basename(root)}/")
            if level < 2:  # Only show files up to 2 levels deep
                subindent = ' ' * 4 * (level + 1)
                for f in files[:5]:  # Show only first 5 files
                    print(f"{subindent}{f}")
                if len(files) > 5:
                    print(f"{subindent}...")
    except Exception as e:
        print(f"Error: {str(e)}")

Error: name 'hashlib' is not defined


In [4]:
def load_data_from_directory(base_dir):
    data = []
    for label, sentiment in [("pos", 1), ("neg", 0)]:  # assign 1 to positive, 0 to negative
        dir_path = os.path.join(base_dir, label)
        for file in os.listdir(dir_path):
            with open(os.path.join(dir_path, file), encoding='utf-8') as f:
                data.append({"text": f.read().strip(), "label": sentiment})
    return pd.DataFrame(data)

# load train and test datasets
train_dir = "./data/aclImdb/train"
test_dir = "./data/aclImdb/test"

train_data = load_data_from_directory(train_dir)
test_data = load_data_from_directory(test_dir)

print(train_data.head())

                                                text  label
0  Bromwell High is a cartoon comedy. It ran at t...      1
1  Homelessness (or Houselessness as George Carli...      1
2  Brilliant over-acting by Lesley Ann Warren. Be...      1
3  This is easily the most underrated film inn th...      1
4  This is not the typical Mel Brooks film. It wa...      1


In [5]:
# Initialize tokenizer
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
from transformers import AutoModel, AutoTokenizer, AdamW, get_scheduler
import numpy as np

class EnhancedSentimentAnalyzer(nn.Module):
    def __init__(self, pretrained_model="distilbert-base-uncased", num_classes=2, dropout_rate=0.3):
        super().__init__()
        
        # Load pre-trained transformer
        self.transformer = AutoModel.from_pretrained(pretrained_model)
        hidden_size = self.transformer.config.hidden_size  # Usually 768 for DistilBERT
        
        # Convolutional layers for n-gram feature extraction
        self.conv_layers = nn.ModuleList([
            nn.Conv1d(hidden_size, 128, kernel_size=k) 
            for k in [2, 3, 4, 5]
        ])
        
        # Layer normalization
        self.layer_norm1 = nn.LayerNorm(hidden_size)
        self.layer_norm2 = nn.LayerNorm(512)  # For concatenated conv outputs
        
        # Bidirectional LSTM
        self.lstm = nn.LSTM(
            input_size=hidden_size,
            hidden_size=256,
            num_layers=2,
            bidirectional=True,
            dropout=dropout_rate if dropout_rate > 0 else 0,
            batch_first=True
        )
        
        # Self-attention mechanism
        self.attention = nn.Sequential(
            nn.Linear(512, 512),  # 512 = hidden_size * 2 (bidirectional)
            nn.Tanh(),
            nn.Linear(512, 1),
            nn.Softmax(dim=1)
        )
        
        # Fully connected layers
        self.fc1 = nn.Linear(1024, 512)  # 1024 = 512 (conv) + 512 (lstm)
        self.fc2 = nn.Linear(512, num_classes)
        
        # Dropout
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, input_ids, attention_mask):
        # Get transformer outputs
        transformer_outputs = self.transformer(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        sequence_output = transformer_outputs[0]  # [batch_size, seq_len, hidden_size]
        sequence_output = self.layer_norm1(sequence_output)
        
        # CNN Feature Extraction
        conv_input = sequence_output.transpose(1, 2)  # [batch_size, hidden_size, seq_len]
        conv_outputs = []
        for conv in self.conv_layers:
            conv_out = F.relu(conv(conv_input))
            # Global max pooling
            conv_out = F.max_pool1d(conv_out, conv_out.size(2)).squeeze(2)
            conv_outputs.append(conv_out)
        
        # Concatenate all conv outputs
        conv_output = torch.cat(conv_outputs, dim=1)  # [batch_size, 512]
        conv_output = self.layer_norm2(conv_output)
        
        # LSTM processing
        lstm_output, _ = self.lstm(sequence_output)  # [batch_size, seq_len, 2*hidden_size]
        
        # Apply attention to LSTM output
        attention_weights = self.attention(lstm_output)  # [batch_size, seq_len, 1]
        attention_output = torch.bmm(attention_weights.transpose(1, 2), lstm_output)  # [batch_size, 1, 2*hidden_size]
        attention_output = attention_output.squeeze(1)  # [batch_size, 2*hidden_size]
        
        # Combine features
        combined = torch.cat([conv_output, attention_output], dim=1)
        
        # Final classification
        out = self.dropout(F.relu(self.fc1(combined)))
        out = self.fc2(out)
        
        return out

class SentimentDataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
        
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        
        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

def train_model(model, train_loader, valid_loader, criterion, optimizer, scheduler, n_epochs, device, 
                patience=3, min_delta=0.001):

    # Print CUDA information
    print(f"Using device: {device}")
    if device.type == 'cuda':
        print(f"CUDA device: {torch.cuda.get_device_name(0)}")
        print(f"Memory allocated: {torch.cuda.memory_allocated(0) / 1024**2:.2f} MB")
    
    # Initialize tracking variables
    best_valid_loss = float('inf')
    best_metrics = None
    epochs_without_improvement = 0
    training_history = []
    
    for epoch in range(n_epochs):
        print(f"\nEpoch {epoch+1}/{n_epochs}")
        print("-" * 50)
        
        # Training phase
        model.train()
        total_train_loss = 0
        train_predictions = []
        train_true_labels = []
        
        for batch_idx, batch in enumerate(train_loader):
            # Move batch to device
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            # Forward pass
            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            
            # Backward pass
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            scheduler.step()  
            
            # Track metrics
            total_train_loss += loss.item()
            batch_predictions = torch.argmax(outputs, dim=1)
            train_predictions.extend(batch_predictions.cpu().numpy())
            train_true_labels.extend(labels.cpu().numpy())
            
        # Validation phase
        model.eval()
        total_valid_loss = 0
        valid_predictions = []
        valid_true_labels = []
        
        print("\nStarting validation...")
        with torch.no_grad():
            for batch in valid_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)
                
                outputs = model(input_ids, attention_mask)
                loss = criterion(outputs, labels)
                
                total_valid_loss += loss.item()
                batch_predictions = torch.argmax(outputs, dim=1)
                valid_predictions.extend(batch_predictions.cpu().numpy())
                valid_true_labels.extend(labels.cpu().numpy())
        
        # Calculate epoch metrics
        avg_train_loss = total_train_loss / len(train_loader)
        avg_valid_loss = total_valid_loss / len(valid_loader)
        
        # Training metrics
        train_precision, train_recall, train_f1, _ = precision_recall_fscore_support(
            train_true_labels, train_predictions, average='binary'
        )
        train_accuracy = accuracy_score(train_true_labels, train_predictions)
        
        # Validation metrics
        valid_precision, valid_recall, valid_f1, _ = precision_recall_fscore_support(
            valid_true_labels, valid_predictions, average='binary'
        )
        valid_accuracy = accuracy_score(valid_true_labels, valid_predictions)
        
        # Store history
        epoch_metrics = {
            'epoch': epoch + 1,
            'train_loss': avg_train_loss,
            'valid_loss': avg_valid_loss,
            'train_accuracy': train_accuracy,
            'valid_accuracy': valid_accuracy,
            'train_f1': train_f1,
            'valid_f1': valid_f1,
            'learning_rate': optimizer.param_groups[0]['lr']  # Track learning rate
        }
        training_history.append(epoch_metrics)
        
        # Check for improvement
        if avg_valid_loss < best_valid_loss - min_delta:
            print("Validation loss improved! Saving model...")
            best_valid_loss = avg_valid_loss
            torch.save(model.state_dict(), 'best_model.pt')

    return training_history

# Configuration and hyperparameters
config = {
    'learning_rate': 2e-5,
    'n_epochs': 5,
    'batch_size': 32,
    'dropout_rate': 0.3,
    'max_length': 128,  # Maximum sequence length for tokenization
    'weight_decay': 0.01
}

In [7]:
# Create datasets
train_dataset = SentimentDataset(
    train_data['text'].tolist(),
    train_data['label'].tolist(),
    tokenizer
)

test_dataset = SentimentDataset(
    test_data['text'].tolist(),
    test_data['label'].tolist(),
    tokenizer
)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)
valid_loader = DataLoader(test_dataset, batch_size=config['batch_size'])

In [8]:
# Initialize the model
model = EnhancedSentimentAnalyzer(
    pretrained_model="distilbert-base-uncased",
    num_classes=2,
    dropout_rate=0.3
).to(device)

# Initialize optimizer with weight decay and learning rate
optimizer = torch.optim.AdamW(model.parameters(), 
                            lr=config['learning_rate'],
                            weight_decay=0.01)

# Initialize loss function
criterion = nn.CrossEntropyLoss()

# Optional: Add a learning rate scheduler for better training
scheduler = torch.optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=config['learning_rate'],
    epochs=config['n_epochs'],
    steps_per_epoch=len(train_loader),
    pct_start=0.1  # Use 10% of steps for warmup
)

In [None]:
history = train_model(
    model=model,
    train_loader=train_loader,
    valid_loader=valid_loader,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,  # Add scheduler parameter
    n_epochs=config['n_epochs'],
    device=device,
    min_delta=0.001
)

Using device: cuda
CUDA device: NVIDIA GeForce RTX 3060 Laptop GPU
Memory allocated: 278.04 MB

Epoch 1/5
--------------------------------------------------

Starting validation...
Validation loss improved! Saving model...

Epoch 2/5
--------------------------------------------------

Starting validation...

Epoch 3/5
--------------------------------------------------

Starting validation...
Validation loss improved! Saving model...

Epoch 4/5
--------------------------------------------------

Starting validation...


In [None]:
# Load best model
checkpoint = torch.load('best_model_checkpoint.pt')
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

# Evaluate on test set
predictions = []
true_labels = []
with torch.no_grad():
    for batch in valid_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        outputs = model(input_ids, attention_mask)
        preds = torch.argmax(outputs, dim=1)
        
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())

# Calculate final metrics
precision, recall, f1, _ = precision_recall_fscore_support(true_labels, predictions, average='binary')
accuracy = accuracy_score(true_labels, predictions)

print(f"Final Results:")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

  checkpoint = torch.load('best_model_checkpoint.pt')


Final Results:
Accuracy: 0.8612
Precision: 0.8168
Recall: 0.9314
F1 Score: 0.8703


### Test Data

In [19]:
import pandas as pd
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
import torch
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


In [22]:
# Load the dataset
csv_path = r'C:\Users\Parthasarathy.Harini\Downloads\NLP\data\test_data_movie.csv'
test_data = pd.read_csv(csv_path)


In [23]:
# Extract text and labels
test_texts = test_data['text'].tolist()
test_labels = test_data['label'].tolist()

# Initialize the tokenizer
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

# Tokenize the text data
test_encodings = tokenizer(test_texts, truncation=True, padding=True, max_length=128)


In [24]:
# Convert to PyTorch tensors
test_inputs = torch.tensor(test_encodings['input_ids'])
test_masks = torch.tensor(test_encodings['attention_mask'])
test_labels = torch.tensor(test_labels)

# Create a DataLoader
test_dataset = TensorDataset(test_inputs, test_masks, test_labels)
test_loader = DataLoader(test_dataset, batch_size=16)


In [25]:
# Load the trained model
model = DistilBertForSequenceClassification.from_pretrained('./bert_sentiment_model')  # path to your trained model

In [26]:
# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

DistilBertForSequenceClassification(
  (distilbert): DistilBertModel(
    (embeddings): Embeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (transformer): Transformer(
      (layer): ModuleList(
        (0-5): 6 x TransformerBlock(
          (attention): DistilBertSdpaAttention(
            (dropout): Dropout(p=0.1, inplace=False)
            (q_lin): Linear(in_features=768, out_features=768, bias=True)
            (k_lin): Linear(in_features=768, out_features=768, bias=True)
            (v_lin): Linear(in_features=768, out_features=768, bias=True)
            (out_lin): Linear(in_features=768, out_features=768, bias=True)
          )
          (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
          (ffn): FFN(
            (dropout): Dropout(p=0.1, inplace=False)


In [27]:
# Evaluate the model
model.eval()
predictions, true_labels = [], []

with torch.no_grad():
    for batch in test_loader:
        inputs, masks, labels = [b.to(device) for b in batch]
        outputs = model(input_ids=inputs, attention_mask=masks)
        logits = outputs.logits
        preds = torch.argmax(logits, axis=1).cpu().numpy()
        predictions.extend(preds)
        true_labels.extend(labels.cpu().numpy())


In [10]:
import pandas as pd
from transformers import AutoTokenizer
import torch
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

# Load the dataset
csv_path = r'C:\Users\Parthasarathy.Harini\Downloads\NLP\data\test_data_movie.csv'
test_data = pd.read_csv(csv_path)

# Check for duplicates
print(f"Original dataset size: {len(test_data)}")
test_data = test_data.drop_duplicates(subset='text', keep='first').reset_index(drop=True)
print(f"Dataset size after removing duplicates: {len(test_data)}")

# Initialize tokenizer for DistilBERT
tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased')

# Create test dataset
test_dataset = SentimentDataset(
    texts=test_data['text'].tolist(),
    labels=test_data['label'].tolist(),
    tokenizer=tokenizer,
    max_length=128
)

# Create test dataloader
test_loader = DataLoader(test_dataset, batch_size=16)

# Load the best model checkpoint
print("Loading best model checkpoint...")
checkpoint = torch.load('best_model_checkpoint.pt')

# Initialize model
model = EnhancedSentimentAnalyzer(
    pretrained_model="distilbert-base-uncased",
    num_classes=2,
    dropout_rate=0.3
).to(device)

# Load model weights
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

# Evaluate
print("Starting evaluation...")
predictions = []
true_labels = []

with torch.no_grad():
    for batch in test_loader:
        # Move batch to device
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        # Get predictions
        outputs = model(input_ids, attention_mask)
        preds = torch.argmax(outputs, dim=1)
        
        # Store predictions and true labels
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(true_labels, predictions)
precision, recall, f1, _ = precision_recall_fscore_support(true_labels, predictions, average='binary')

print("\nTest Set Results:")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

output_df = pd.DataFrame({
    'text': test_data['text'],
    'true_label': test_data['label'],
    'predicted_label': predictions
})

output_path = 'enhanced_model_predictions.csv'
output_df.to_csv(output_path, index=False)
print(f"\nPredictions saved to: {output_path}")

print("\nExample Predictions:")
for i in range(5):
    print(f"\nText: {output_df['text'].iloc[i][:100]}...")
    print(f"True Label: {output_df['true_label'].iloc[i]}")
    print(f"Predicted Label: {output_df['predicted_label'].iloc[i]}")

Original dataset size: 40000
Dataset size after removing duplicates: 39723
Loading best model checkpoint...


  checkpoint = torch.load('best_model_checkpoint.pt')


Starting evaluation...

Test Set Results:
Accuracy: 0.8790
Precision: 0.8371
Recall: 0.9419
F1 Score: 0.8864

Predictions saved to: enhanced_model_predictions.csv

Example Predictions:

Text: I grew up (b. 1965) watching and loving the Thunderbirds. All my mates at school watched. We played ...
True Label: 0
Predicted Label: 0

Text: When I put this movie in my DVD player, and sat down with a coke and some chips, I had some expectat...
True Label: 0
Predicted Label: 1

Text: Why do people who do not know what a particular time in the past was like feel the need to try to de...
True Label: 0
Predicted Label: 0

Text: Even though I have great interest in Biblical movies, I was bored to death every minute of the movie...
True Label: 0
Predicted Label: 0

Text: Im a die hard Dads Army fan and nothing will ever change that. I got all the tapes, DVD's and audiob...
True Label: 1
Predicted Label: 1
