# CNN model

### Import Datasets

In [52]:
import pandas as pd

# Import Datasets
df1 = pd.read_csv(r'C:\Users\megan\Desktop\ML Project\processed_df1.csv')
df2 = pd.read_csv(r'C:\Users\megan\Desktop\ML Project\processed_df2.csv')
df3 = pd.read_csv(r'C:\Users\megan\Desktop\ML Project\processed_df3.csv')

###  Imports

In [53]:
import torchtext; torchtext.disable_torchtext_deprecation_warning()
from collections import Counter
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, recall_score, f1_score
from collections import Counter
from torchtext.vocab import Vocab
import torch
import torch.nn as nn
import torch.optim as optim
import re

### CNN Model

In [None]:
# Hyperparameters
learning_rate = 0.001    # Learning rate for the optimizer
batch_size = 32          # Batch size for DataLoader
embed_dim = 100          # Embedding dimension for word vectors
num_filters = 100        # Number of filters in the convolutional layer
kernel_size = 3          # Kernel size for the convolutional layer
epochs = 5               # Number of training epochs
dropout_rate = 0.2       # Dropout rate for regularization - prevent overfitting

# Tokenizer function
def tokenizer(text):
    return re.findall(r'\b\w+\b', text.lower())

# Label encoding
label_encoder = LabelEncoder()

# Encode sentiment columns that contains the labels (pos., neg., neutral)
df1['encoded_label'] = label_encoder.fit_transform(df1['sentiment'])  
df3['encoded_label'] = label_encoder.transform(df3['sentiment']) 

# Create token-to-index manually
counter = Counter()
for text in df1['cleaned_reviewText']:   
    counter.update(tokenizer(text))

# Add special tokens manually
counter.update(['<pad>', '<unk>'])

# Build vocab with special tokens
vocab = {word: idx for idx, (word, _) in enumerate(counter.items())}

# Manually map tokens to indices - <unk> if not found in vocab
def encode(tokens):
    unk_index = vocab['<unk>']
    return [vocab[token] if token in vocab else unk_index for token in tokens]

# Dataset class
class SentimentDataset(Dataset):
    def __init__(self, texts, labels, vocab, tokenizer):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        tokens = self.tokenizer(self.texts[idx])
        indices = encode(tokens) 
        return torch.tensor(indices), self.labels[idx]

# Collate function for padding
def collate_fn(batch):
    texts, labels = zip(*batch)
    padded = pad_sequence(texts, batch_first=True, padding_value=vocab['<pad>']).long()
    return padded, torch.tensor(labels, dtype=torch.long)

# DataLoaders
train_dataset = SentimentDataset(df1['cleaned_reviewText'].tolist(), df1['encoded_label'].tolist(), vocab, tokenizer)
test_dataset = SentimentDataset(df3['review_description'].tolist(), df3['encoded_label'].tolist(), vocab, tokenizer)

train_loader_cnn = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_loader_cnn = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [None]:
# Define CNN model
class modelCNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes, num_filters, kernel_size, dropout_rate):
        super(modelCNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=vocab['<pad>'])
        self.conv = nn.Conv1d(in_channels=embed_dim, out_channels=num_filters, kernel_size=kernel_size)
        self.pool = nn.AdaptiveMaxPool1d(1)
        self.fc = nn.Linear(num_filters, num_classes)
        self.dropout = nn.Dropout(dropout_rate)  

    def forward(self, x):
        x = self.embedding(x)           # (batch_size, seq_len, embed_dim)
        x = x.permute(0, 2, 1)          # (batch_size, embed_dim, seq_len)
        x = torch.relu(self.conv(x))    # (batch_size, num_filters, L_out)
        x = self.pool(x).squeeze(2)     # (batch_size, num_filters)
        x = self.dropout(x)             # Apply dropout
        return self.fc(x)               # (batch_size, num_classes)

# Instantiate model
model_cnn = modelCNN(vocab_size=len(vocab), embed_dim=embed_dim, num_classes=3, 
                     num_filters=num_filters, kernel_size=kernel_size, dropout_rate=dropout_rate)

# Criterion and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_cnn.parameters(), lr=learning_rate)

# Training loop
for epoch in range(epochs):
    model_cnn.train()
    running_loss = 0.0
    correct_preds = 0
    total_preds = 0

    for inputs, labels in train_loader_cnn:
        optimizer.zero_grad()
        outputs = model_cnn(inputs)  # Forward pass
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct_preds += (predicted == labels).sum().item()
        total_preds += labels.size(0)

    accuracy = correct_preds / total_preds
    avg_loss = running_loss / len(train_loader_cnn)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {round(avg_loss, 4)}, Accuracy: {round(accuracy, 4)}")   # Print Epoch and Loss/Accuracy associated with it

# Evaluate the model on the test set df3
model_cnn.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader_cnn: 
        outputs = model_cnn(inputs)
        _, predicted = torch.max(outputs, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate metrics
accuracy = accuracy_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds, average='weighted')
f1 = f1_score(all_labels, all_preds, average='weighted')
print("")
print("Evaluation using Test Data(df3): ")
print(f"Accuracy: {round(accuracy, 4)}")
print(f"Recall: {round(recall, 4)}")
print(f"F1 Score: {round(f1, 4)}")

Epoch 1/5, Loss: 0.9063, Accuracy: 0.5943
Epoch 2/5, Loss: 0.7381, Accuracy: 0.6913
Epoch 3/5, Loss: 0.6603, Accuracy: 0.7306
Epoch 4/5, Loss: 0.5885, Accuracy: 0.761
Epoch 5/5, Loss: 0.5155, Accuracy: 0.7977

Evaluation using Test Data(df3): 
Accuracy: 0.8152
Recall: 0.8152
F1 Score: 0.8906


# Experimentation

### Hyperparameter Tuning

In [None]:
# New Hyperparameters
learning_rate = 0.005   # Increase lr from .001 to .005
batch_size = 16         # Smaller batch size
embed_dim = 100
hidden_dim = 64         # Add hidden dimension
num_layers = 2          # Add num layers
dropout_rate = 0.3      # Increase dropout_rate
epochs = 10             # Increase epochs

# Define tokenizer function
def tokenizer(text):
    return re.findall(r'\b\w+\b', text.lower())

# Label encoding
label_encoder = LabelEncoder()

# Encode sentiment columns that contains the labels (pos., neg., neutral)
df1['encoded_label'] = label_encoder.fit_transform(df1['sentiment'])  
df3['encoded_label'] = label_encoder.transform(df3['sentiment'])  

# Manual token-to-index
counter = Counter()
for text in df1['cleaned_reviewText']: 
    counter.update(tokenizer(text))

# Add special tokens
counter.update(['<pad>', '<unk>'])

# Build vocab with special tokens
vocab = {word: idx for idx, (word, _) in enumerate(counter.items())}

# Manually map tokens to indices - <unk> if not in vocab
def encode(tokens):
    unk_index = vocab['<unk>']
    return [vocab[token] if token in vocab else unk_index for token in tokens]

# Dataset class 
class SentimentDataset(Dataset):
    def __init__(self, texts, labels, vocab, tokenizer):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        tokens = self.tokenizer(self.texts[idx])
        indices = encode(tokens)  
        return torch.tensor(indices), self.labels[idx]

# Collate function for padding
def collate_fn(batch):
    texts, labels = zip(*batch)
    padded = pad_sequence(texts, batch_first=True, padding_value=vocab['<pad>']).long()
    return padded, torch.tensor(labels, dtype=torch.long)

# DataLoaders
train_dataset = SentimentDataset(df1['cleaned_reviewText'].tolist(), df1['encoded_label'].tolist(), vocab, tokenizer)
test_dataset = SentimentDataset(df3['review_description'].tolist(), df3['encoded_label'].tolist(), vocab, tokenizer)

train_loader_cnn = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_loader_cnn = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

class modelCNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes, num_filters, kernel_size, dropout_rate):
        super(modelCNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=vocab['<pad>'])
        self.conv = nn.Conv1d(in_channels=embed_dim, out_channels=num_filters, kernel_size=kernel_size)
        self.pool = nn.AdaptiveMaxPool1d(1)
        self.fc = nn.Linear(num_filters, num_classes)
        self.dropout = nn.Dropout(dropout_rate)  # Dropout layer

    def forward(self, x):
        x = self.embedding(x)           # (batch_size, seq_len, embed_dim)
        x = x.permute(0, 2, 1)          # (batch_size, embed_dim, seq_len)
        x = torch.relu(self.conv(x))    # (batch_size, num_filters, L_out)
        x = self.pool(x).squeeze(2)     # (batch_size, num_filters)
        x = self.dropout(x)             # Apply dropout
        return self.fc(x)               # (batch_size, num_classes)

# Instantiate model
model_cnn = modelCNN(vocab_size=len(vocab), embed_dim=embed_dim, num_classes=3, 
                     num_filters=num_filters, kernel_size=kernel_size, dropout_rate=dropout_rate)

# Criterion and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_cnn.parameters(), lr=learning_rate)

# Training loop
for epoch in range(epochs):
    model_cnn.train()
    running_loss = 0.0
    correct_preds = 0
    total_preds = 0

    for inputs, labels in train_loader_cnn:
        optimizer.zero_grad()
        outputs = model_cnn(inputs)  # Forward pass
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct_preds += (predicted == labels).sum().item()
        total_preds += labels.size(0)

    accuracy = correct_preds / total_preds
    avg_loss = running_loss / len(train_loader_cnn)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {round(avg_loss, 4)}, Accuracy: {round(accuracy, 4)}")

# Evaluate the model on the test set df3
model_cnn.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader_cnn: 
        outputs = model_cnn(inputs)
        _, predicted = torch.max(outputs, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate metrics
accuracy = accuracy_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds, average='weighted')
f1 = f1_score(all_labels, all_preds, average='weighted')
print("")
print(f"[New HP] Accuracy: {round(accuracy, 4)}")
print(f"[New HP] Recall: {round(recall, 4)}")
print(f"[New HP] F1 Score: {round(f1, 4)}")

Epoch 1/10, Loss: 0.9643, Accuracy: 0.6016
Epoch 2/10, Loss: 0.744, Accuracy: 0.7045
Epoch 3/10, Loss: 0.5931, Accuracy: 0.7688
Epoch 4/10, Loss: 0.4577, Accuracy: 0.8343
Epoch 5/10, Loss: 0.3355, Accuracy: 0.8793
Epoch 6/10, Loss: 0.2852, Accuracy: 0.9047
Epoch 7/10, Loss: 0.263, Accuracy: 0.9177
Epoch 8/10, Loss: 0.2197, Accuracy: 0.9353
Epoch 9/10, Loss: 0.2215, Accuracy: 0.9387
Epoch 10/10, Loss: 0.2073, Accuracy: 0.9457

[New HP] Accuracy: 0.7217
[New HP] Recall: 0.7217
[New HP] F1 Score: 0.8293


### Different Dataset Sizes

In [57]:
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, Embedding
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import LabelEncoder

In [None]:
# Load dataset - Using df1
df_ds = pd.read_csv(r'C:\Users\megan\Desktop\ML Project\processed_df1.csv')

# Split data into training and testing - 80% train, 20% test
train_data, test_data = train_test_split(df_ds, test_size=0.2, random_state=42)

# Tokenization and padding function
def preprocess_data(data, tokenizer=None, max_words=10000, max_len=100):
    if tokenizer is None:
        tokenizer = Tokenizer(num_words=max_words)
        tokenizer.fit_on_texts(data['cleaned_reviewText'])
    sequences = tokenizer.texts_to_sequences(data['cleaned_reviewText'])
    padded_sequences = pad_sequences(sequences, maxlen=max_len)
    return padded_sequences, tokenizer

# Convert labels to integers (sentiment)
def encode_labels(labels):
    label_encoder = LabelEncoder()
    return label_encoder.fit_transform(labels)

# CNN model
def ds_cnn_model(input_length, num_classes):
    model = Sequential()
    model.add(Embedding(input_dim=10000, output_dim=100, input_length=input_length))
    model.add(Conv1D(128, 5, activation='relu'))
    model.add(MaxPooling1D(pool_size=4))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(num_classes, activation='softmax'))
    
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Function to train and evaluate model
def train_and_evaluate(train_data, test_data, subset_size):
    # Subset the training data
    train_subset = train_data.sample(frac=subset_size, random_state=42)
    
    # Preprocess the data
    X_train, tokenizer = preprocess_data(train_subset, max_words=10000)
    X_test, _ = preprocess_data(test_data, tokenizer=tokenizer, max_words=10000)
    
    y_train = encode_labels(train_subset['sentiment'])
    y_test = encode_labels(test_data['sentiment'])
    
    # Build and train CNN model
    model = ds_cnn_model(input_length=X_train.shape[1], num_classes=len(set(y_train)))
    model.fit(X_train, y_train, epochs=5, batch_size=32, validation_data=(X_test, y_test))
    
    # Evaluate model
    loss, accuracy = model.evaluate(X_test, y_test)
    print(f"Dataset Size: {subset_size * 100}% | Accuracy: {round(accuracy, 4)}\n")

# Experiment with 25% and 50% dataset sizes
dataset_sizes = [0.25, 0.5]  # 25% and 50% dataset sizes
for size in dataset_sizes:
    train_and_evaluate(train_data, test_data, size)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Dataset Size: 25.0% | Accuracy: 0.7133

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Dataset Size: 50.0% | Accuracy: 0.7262



In [None]:
# Load your dataset - Using df3
df_ds = pd.read_csv(r'C:\Users\megan\Desktop\ML Project\processed_df3.csv')

# Split data into training and testing - 80% train, 20% test
train_data, test_data = train_test_split(df_ds, test_size=0.2, random_state=42)

# Tokenization and padding function
def preprocess_data(data, tokenizer=None, max_words=10000, max_len=100):
    if tokenizer is None:
        tokenizer = Tokenizer(num_words=max_words)
        tokenizer.fit_on_texts(data['review_description'])
    sequences = tokenizer.texts_to_sequences(data['review_description'])
    padded_sequences = pad_sequences(sequences, maxlen=max_len)
    return padded_sequences, tokenizer

# Convert labels to integers (sentiment)
def encode_labels(labels):
    label_encoder = LabelEncoder()
    return label_encoder.fit_transform(labels)

# CNN model
def ds_cnn_model(input_length, num_classes):
    model = Sequential()
    model.add(Embedding(input_dim=10000, output_dim=100, input_length=input_length))
    model.add(Conv1D(128, 5, activation='relu'))
    model.add(MaxPooling1D(pool_size=4))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(num_classes, activation='softmax'))
    
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Function to train and evaluate model
def train_and_evaluate(train_data, test_data, subset_size):
    # Subset the training data
    train_subset = train_data.sample(frac=subset_size, random_state=42)
    
    # Preprocess the data
    X_train, tokenizer = preprocess_data(train_subset, max_words=10000)
    X_test, _ = preprocess_data(test_data, tokenizer=tokenizer, max_words=10000)
    
    y_train = encode_labels(train_subset['sentiment'])
    y_test = encode_labels(test_data['sentiment'])
    
    # Build and train CNN model
    model = ds_cnn_model(input_length=X_train.shape[1], num_classes=len(set(y_train)))
    model.fit(X_train, y_train, epochs=5, batch_size=32, validation_data=(X_test, y_test))
    
    # Evaluate model
    loss, accuracy = model.evaluate(X_test, y_test)
    print(f"Dataset Size: {subset_size * 100}% | Accuracy: {round(accuracy, 4)}\n")

# Experiment with 25% and 50% dataset sizes
dataset_sizes = [0.25, 0.5]  # 25% and 50% dataset sizes
for size in dataset_sizes:
    train_and_evaluate(train_data, test_data, size)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Dataset Size: 25.0% | Accuracy: 0.9891

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Dataset Size: 50.0% | Accuracy: 0.0

