model types:
tfidf -> ffnn
word2vec -> cnn
word2vec -> lstm
roberta -> ffnn

In [1]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("rtatman/deceptive-opinion-spam-corpus")

print("Path to dataset files:", path)

Path to dataset files: C:\Users\pokes\.cache\kagglehub\datasets\rtatman\deceptive-opinion-spam-corpus\versions\2


In [2]:
import numpy as np
import pandas as pd
import torch
from torch import nn
from sklearn.model_selection import KFold
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import precision_score, recall_score, f1_score
from nltk import word_tokenize
import gensim.downloader

In [3]:
if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("cpu")

In [4]:
df = pd.read_csv(path + "\\deceptive-opinion.csv")
X_text = df["text"].values
y = df["deceptive"].apply(lambda x: 1 if x == "deceptive" else 0).values

In [5]:
kf = KFold(n_splits=5, shuffle=True, random_state=42)

In [28]:
accs, precisions, recalls, f1s = [], [], [], []

for train_index, test_index in kf.split(X_text):
  X_train, X_test = X_text[train_index], X_text[test_index]
  y_train, y_test = y[train_index], y[test_index]
  vectorizer = TfidfVectorizer()
  X_train_tfidf = vectorizer.fit_transform(X_train)
  X_test_tfidf = vectorizer.transform(X_test)

  model = nn.Sequential(
    nn.Linear(X_train_tfidf.shape[1], 256),
    nn.ReLU(),
    nn.Linear(256, 128),
    nn.ReLU(),
    nn.Linear(128, 64),
    nn.ReLU(),
    nn.Linear(64, 32),
    nn.ReLU(),
    nn.Linear(32, 1),
    nn.Sigmoid()
  )

  criterion = nn.BCELoss()
  optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  X_train_tensor = torch.tensor(X_train_tfidf.toarray(), dtype=torch.float32)
  y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
  X_test_tensor = torch.tensor(X_test_tfidf.toarray(), dtype=torch.float32)
  y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)
  for epoch in range(100):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()

  with torch.no_grad():
    test_outputs = model(X_test_tensor)
    boundary = 0.5
    test_outputs = (test_outputs > boundary).float()
    accuracy = (test_outputs.squeeze() == y_test_tensor.squeeze()).float().mean().item()
    precision = precision_score(y_test, test_outputs.numpy())
    recall = recall_score(y_test, test_outputs.numpy())
    f1 = f1_score(y_test, test_outputs.numpy())
  accs.append(accuracy)
  precisions.append(precision)
  recalls.append(recall)
  f1s.append(f1)
  print(f"Fold accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

Fold accuracy: 0.8719, Precision: 0.8447, Recall: 0.8947, F1 Score: 0.8690
Fold accuracy: 0.8250, Precision: 0.8409, Recall: 0.8409, F1 Score: 0.8409
Fold accuracy: 0.8687, Precision: 0.7989, Recall: 0.9521, F1 Score: 0.8688
Fold accuracy: 0.8344, Precision: 0.7943, Recall: 0.8910, F1 Score: 0.8399
Fold accuracy: 0.8562, Precision: 0.9079, Recall: 0.8118, F1 Score: 0.8571


In [29]:
print(f"Average accuracy: {np.mean(accs):.4f}, Precision: {np.mean(precisions):.4f}, Recall: {np.mean(recalls):.4f}, F1 Score: {np.mean(f1s):.4f}")

Average accuracy: 0.8512, Precision: 0.8373, Recall: 0.8781, F1 Score: 0.8551


In [44]:
w2v = gensim.downloader.load("glove-wiki-gigaword-300")



In [None]:
accs, precisions, recalls, f1s = [], [], [], []

for train_index, test_index in kf.split(X_text):
  X_train, X_test = X_text[train_index], X_text[test_index]
  y_train, y_test = y[train_index], y[test_index]
  X_train = [[w2v[word] for word in word_tokenize(text.lower()) if word in w2v] for text in X_train]
  X_test = [[w2v[word] for word in word_tokenize(text.lower()) if word in w2v] for text in X_test]
  max_len = max(len(x) for x in X_train)
  X_train = [x + [np.zeros(300)] * (max_len - len(x)) if len(x) < max_len else x[:max_len] for x in X_train]
  X_test = [x + [np.zeros(300)] * (max_len - len(x)) if len(x) < max_len else x[:max_len] for x in X_test]
  X_train = np.array(X_train).transpose((0, 2, 1))  # Shape: (num_samples, 300, max_len)
  X_test = np.array(X_test).transpose((0, 2, 1))  # Shape: (num_samples, 300, max_len)

  model = nn.Sequential(
    nn.Conv1d(in_channels=300, out_channels=128, kernel_size=3),
    nn.ReLU(),
    nn.MaxPool1d(kernel_size=2),
    nn.Conv1d(in_channels=128, out_channels=64, kernel_size=3),
    nn.ReLU(),
    nn.MaxPool1d(kernel_size=2),
    nn.Flatten(),
    nn.Linear(64 * (((max_len - 3 + 1 - 2) // 2 + 1 - 3 + 1 - 2) // 2 + 1), 32),
    nn.ReLU(),
    nn.Linear(32, 1),
    nn.Sigmoid()
  )

  criterion = nn.BCELoss()
  optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
  y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
  X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
  y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)
  for epoch in range(100):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()

  with torch.no_grad():
    test_outputs = model(X_test_tensor)
    boundary = 0.5
    test_outputs = (test_outputs > boundary).float()
    accuracy = (test_outputs.squeeze() == y_test_tensor.squeeze()).float().mean().item()
    precision = precision_score(y_test, test_outputs.numpy())
    recall = recall_score(y_test, test_outputs.numpy())
    f1 = f1_score(y_test, test_outputs.numpy())
  accs.append(accuracy)
  precisions.append(precision)
  recalls.append(recall)
  f1s.append(f1)
  print(f"Fold accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

Fold accuracy: 0.8281, Precision: 0.8050, Recall: 0.8421, F1 Score: 0.8232
Fold accuracy: 0.8281, Precision: 0.8380, Recall: 0.8523, F1 Score: 0.8451
Fold accuracy: 0.8594, Precision: 0.8582, Recall: 0.8288, F1 Score: 0.8432
Fold accuracy: 0.8219, Precision: 0.8113, Recall: 0.8269, F1 Score: 0.8190
Fold accuracy: 0.8656, Precision: 0.8896, Recall: 0.8529, F1 Score: 0.8709


In [46]:
print(f"Average accuracy: {np.mean(accs):.4f}, Precision: {np.mean(precisions):.4f}, Recall: {np.mean(recalls):.4f}, F1 Score: {np.mean(f1s):.4f}")

Average accuracy: 0.8406, Precision: 0.8404, Recall: 0.8406, F1 Score: 0.8403


In [None]:
class LSTMModel(nn.Module):
  def __init__(self, input_size, hidden_size, num_layers):
    super(LSTMModel, self).__init__()
    self.lstm1 = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
    # self.lstm2 = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size // 2, num_layers=num_layers, batch_first=True)
    self.fc1 =  nn.Linear(hidden_size, 1)

  def forward(self, x):
    assert not torch.isnan(x).any()
    assert not torch.isinf(x).any()

    out, _ = self.lstm1(x)
    out = out.mean(dim=1)  # Average over the sequence length
    out = self.fc1(out)
    return out

In [37]:
class CNNLSTMModel(nn.Module):
  def __init__(self, input_size, hidden_size, num_layers):
    super(CNNLSTMModel, self).__init__()
    self.conv1 = nn.Conv1d(in_channels=input_size, out_channels=hidden_size, kernel_size=3)
    self.lstm = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size // 2, num_layers=num_layers, batch_first=True)
    self.fc1 = nn.Linear(hidden_size // 2, 1)

  def forward(self, x):
    assert not torch.isnan(x).any()
    assert not torch.isinf(x).any()

    x = x.transpose(1, 2)  # Change shape to (batch_size, input_size, sequence_length)
    out = self.conv1(x)
    out = out.transpose(1, 2)  # Change shape back to (batch_size, sequence_length, hidden_size)
    out, _ = self.lstm(out)
    out = out.mean(dim=1)  # Average over the sequence length
    out = self.fc1(out)
    return out

In [9]:
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm

In [36]:
accs, precisions, recalls, f1s = [], [], [], []

for train_index, test_index in kf.split(X_text):
  X_train, X_test = X_text[train_index], X_text[test_index]
  y_train, y_test = y[train_index], y[test_index]
  X_train = [[w2v[word] for word in word_tokenize(text.lower()) if word in w2v] for text in X_train]
  X_test = [[w2v[word] for word in word_tokenize(text.lower()) if word in w2v] for text in X_test]
  max_len = max(len(x) for x in X_train)
  X_train = [x + [np.zeros(300)] * (max_len - len(x)) if len(x) < max_len else x[:max_len] for x in X_train]
  X_test = [x + [np.zeros(300)] * (max_len - len(x)) if len(x) < max_len else x[:max_len] for x in X_test]

  model = CNNLSTMModel(input_size=300, hidden_size=128, num_layers=2).to(device)

  criterion = nn.BCEWithLogitsLoss()
  optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
  y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1).to(device)
  X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
  y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1).to(device)

  batch_size = 32

  train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32))
  test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32))
  train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

  pbar = tqdm(range(200), desc="Training Epochs")
  for epoch in pbar:
    for batch_X, batch_y in train_loader:
      model.train()
      optimizer.zero_grad()
      outputs = model(batch_X.to(device))
      loss = criterion(outputs, batch_y.unsqueeze(1).to(device))
      loss.backward()
      optimizer.step()
    pbar.set_description(f"Epoch {epoch+1} Loss: {loss.item():.6f}")

  with torch.no_grad():
    model.eval()
    test_outputs = nn.Sigmoid()(model(X_test_tensor))
    boundary = 0.5
    test_outputs = (test_outputs > boundary).float()
    accuracy = (test_outputs.squeeze() == y_test_tensor.squeeze()).float().mean().item()
    precision = precision_score(y_test, test_outputs.cpu().numpy())
    recall = recall_score(y_test, test_outputs.cpu().numpy())
    f1 = f1_score(y_test, test_outputs.cpu().numpy())
  accs.append(accuracy)
  precisions.append(precision)
  recalls.append(recall)
  f1s.append(f1)
  print(f"Fold accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

Epoch 200 Loss: 0.693212: 100%|██████████| 200/200 [03:46<00:00,  1.13s/it]


Fold accuracy: 0.4750, Precision: 0.4750, Recall: 1.0000, F1 Score: 0.6441


KeyboardInterrupt: 

In [49]:
print(f"Average accuracy: {np.mean(accs):.4f}, Precision: {np.mean(precisions):.4f}, Recall: {np.mean(recalls):.4f}, F1 Score: {np.mean(f1s):.4f}")

Average accuracy: 0.8425, Precision: 0.8324, Recall: 0.8607, F1 Score: 0.8447


In [6]:
from transformers import AutoModel, AutoTokenizer

In [7]:
class LLMEncoderClassifier(nn.Module):
  def __init__(self, encoder):
    super(LLMEncoderClassifier, self).__init__()
    self.encoder = encoder
    self.fc = nn.Linear(encoder.config.hidden_size, encoder.config.hidden_size // 2)
    self.fc2 = nn.Linear(encoder.config.hidden_size // 2, 1)
    self.sigmoid = nn.Sigmoid()

  def forward(self, input_ids, attention_mask):
    outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
    cls_output = outputs.last_hidden_state[:, 0, :]  # Get the CLS token output
    logits = self.fc2(self.fc(cls_output))
    return self.sigmoid(logits)
    

In [None]:
accs, precisions, recalls, f1s = [], [], [], []

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
encoder = AutoModel.from_pretrained("distilbert-base-uncased").to(device)
for param in encoder.parameters():
  param.requires_grad = False

for train_index, test_index in kf.split(X_text):
  X_train, X_test = X_text[train_index], X_text[test_index]
  y_train, y_test = y[train_index], y[test_index]
  X_train_encodings = tokenizer(X_train.tolist(), padding=True, truncation=True, return_tensors="pt").to(device)
  X_test_encodings = tokenizer(X_test.tolist(), padding=True, truncation=True, return_tensors="pt").to(device)
  model = LLMEncoderClassifier(encoder).to(device)

  criterion = nn.BCEWithLogitsLoss()
  optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

  batch_size = 32

  pbar = tqdm(range(100), desc="Training Epochs")
  for epoch in pbar:
    for i in range(0, len(X_train_encodings['input_ids']), batch_size):
      batch_input_ids = X_train_encodings['input_ids'][i:i+batch_size]
      batch_attention_mask = X_train_encodings['attention_mask'][i:i+batch_size]
      batch_y = torch.tensor(y_train[i:i+batch_size], dtype=torch.float32).unsqueeze(1).to(device)

      model.train()
      optimizer.zero_grad()
      outputs = model(batch_input_ids, batch_attention_mask)
      loss = criterion(outputs, batch_y)
      loss.backward()
      optimizer.step()
    pbar.set_description(f"Epoch {epoch+1} Loss: {loss.item():.6f}")

  with torch.no_grad():
    model.eval()
    test_outputs = model(X_test_encodings['input_ids'], X_test_encodings['attention_mask'])
    boundary = 0.5
    test_outputs = (test_outputs > boundary).float()
    accuracy = (test_outputs.squeeze() == torch.tensor(y_test).to(device)).float().mean().item()
    precision = precision_score(y_test, test_outputs.cpu().numpy())
    recall = recall_score(y_test, test_outputs.cpu().numpy())
    f1 = f1_score(y_test, test_outputs.cpu().numpy())
  accs.append(accuracy)
  precisions.append(precision)
  recalls.append(recall)
  f1s.append(f1)
  print(f"Fold accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

Epoch 100 Loss: 0.420263: 100%|██████████| 100/100 [28:27<00:00, 17.08s/it]


NameError: name 'y_test_tensor' is not defined

In [18]:
(test_outputs.squeeze() == torch.tensor(y_test).to(device)).float().mean().item()

0.809374988079071