In [None]:
!unzip liar_dataset.zip

Archive:  liar_dataset.zip
  inflating: README                  
  inflating: test.tsv                
  inflating: train.tsv               
  inflating: valid.tsv               


## Data Analysis and Preprocessing

In [None]:
dataset_columns = ['id','label','text','subject','speaker','job title','state info','party','barely true','false','half true','mostly true','pants on fire','context']

In [None]:
import pandas as pd
import numpy as np

In [None]:
train = pd.read_csv('train.tsv',sep='\t',header=None, names = dataset_columns)
test = pd.read_csv('test.tsv',sep='\t',header=None, names = dataset_columns)
val = pd.read_csv('valid.tsv',sep='\t',header=None, names = dataset_columns)

In [None]:
print(train.isna().sum())


id                  0
label               0
text                0
subject             2
speaker             2
job title        2897
state info       2208
party               2
barely true         2
false               2
half true           2
mostly true         2
pants on fire       2
context           102
dtype: int64


In [None]:
def handle_nas(df):
  # get all indices where job title and state info are nas:
  jt_indices = df[df["job title"].isna()].index
  si_indices = df[df["state info"].isna()].index

  df.loc[jt_indices,"job title"] = "unk"
  df.loc[si_indices,"state info"] = "unk"

  # dropping all other rows with nas
  df.dropna(inplace=True)
  df = df.reset_index(drop=True)

  return df


In [None]:
train = handle_nas(train)
test = handle_nas(test)
val = handle_nas(val)

## LSTM With Count Vectorizer

# Data Pre-Processing

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer
import joblib

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

le = LabelEncoder()
train_df = train
test_df = test
val_df = val

train_df['label'] = le.fit_transform(train_df['label'])
val_df['label'] = le.fit_transform(val_df['label'])
test_df['label'] = le.transform(test_df['label'])

# Tokenize the text data
vectorizer = CountVectorizer(stop_words='english', max_features=5000)
X_train_text = vectorizer.fit_transform(train_df['text']).toarray()
X_val_text = vectorizer.transform(val_df['text']).toarray()
X_test_text = vectorizer.transform(test_df['text']).toarray()

joblib.dump(vectorizer, 'count_vectorizer.pkl')

y_train = torch.tensor(train_df['label'].values, dtype=torch.long)
y_val = torch.tensor(val_df['label'].values, dtype=torch.long)
y_test = torch.tensor(test_df['label'].values, dtype=torch.long)


X_train = torch.tensor(X_train_text, dtype=torch.float32)
X_val = torch.tensor(X_val_text, dtype=torch.float32)
X_test = torch.tensor(X_test_text, dtype=torch.float32)


train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


# Model

In [None]:
# Define the LSTM model
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.softmax = nn.Softmax()

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        out = self.softmax(out)
        return out


# Model Instantiation and Training

In [None]:
# Instantiate the model
input_size = X_train.shape[1]
hidden_size = 128
num_layers = 2
output_size = len(le.classes_)
model_LSTM_CountVectorizer = LSTMModel(input_size, hidden_size, num_layers, output_size)
torch.save(model_LSTM_CountVectorizer, 'model_LSTM_CountVectorizer.pth')

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_LSTM_CountVectorizer.parameters(), lr=0.001)

# Training the model
num_epochs = 3

for epoch in range(num_epochs):
    model_LSTM_CountVectorizer.train()
    batch_num = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model_LSTM_CountVectorizer(inputs.unsqueeze(1)).to(device)
        loss = criterion(outputs.to(device), labels.to(device))
        loss.backward()
        optimizer.step()
        batch_num += 1


  return self._call_impl(*args, **kwargs)


# Validation and Testing

In [None]:

# Evaluate the model on the validation set
model_LSTM_CountVectorizer.eval()
with torch.no_grad():
    all_preds_val = []
    all_labels_val = []
    for inputs, labels in val_loader:
        outputs = model_LSTM_CountVectorizer(inputs.unsqueeze(1)).to(device)
        _, preds = torch.max(outputs, 1)
        all_preds_val.extend(preds.cpu().numpy())
        all_labels_val.extend(labels.cpu().numpy())

# Calculate validation accuracy
accuracy_val = accuracy_score(all_labels_val, all_preds_val)
print(f'Validation Accuracy: {accuracy_val}')

# Evaluate the model on the test set
model_LSTM_CountVectorizer.eval()
with torch.no_grad():
    all_preds_test = []
    all_labels_test = []
    for inputs, labels in test_loader:
        outputs = model_LSTM_CountVectorizer(inputs.unsqueeze(1)).to(device)
        _, preds = torch.max(outputs, 1)
        all_preds_test.extend(preds.cpu().numpy())
        all_labels_test.extend(labels.cpu().numpy())

# Calculate test accuracy
accuracy_test = accuracy_score(all_labels_test, all_preds_test)
print(f'Test Accuracy: {accuracy_test}')


Validation Accuracy: 0.22562893081761007
Test Accuracy: 0.2208


  return self._call_impl(*args, **kwargs)


## Hybrid Model With LSTM

# Data Pre-Processing

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.ensemble import GradientBoostingClassifier

# Assuming you have separate DataFrames for train_df, val_df, and test_df
train_df = train
test_df = test
val_df = val

text_tfidf_vectorizer = TfidfVectorizer(analyzer='word', ngram_range=(1, 3))

label_encoder = LabelEncoder()
train_df['subject_encoded'] = label_encoder.fit_transform(train_df['subject'])
train_df['context_encoded'] = label_encoder.fit_transform(train_df['context'])
train_df['speaker_encoded'] = label_encoder.fit_transform(train_df['speaker'])
train_df['party_encoded'] = label_encoder.fit_transform(train_df['party'])
train_df['state_encoded'] = label_encoder.fit_transform(train_df['state info'])

unknown_subject_val = val_df['subject'].unique()
val_df['subject_encoded'] = val_df['subject'].apply(lambda x: label_encoder.transform([x])[0] if x not in unknown_subject_val else -1)
unknown_context_val = val_df['context'].unique()
val_df['context_encoded'] = val_df['context'].apply(lambda x: label_encoder.transform([x])[0] if x not in unknown_context_val else -1)
unknown_speakers_val = val_df['speaker'].unique()
val_df['speaker_encoded'] = val_df['speaker'].apply(lambda x: label_encoder.transform([x])[0] if x not in unknown_speakers_val else -1)
unknown_party_val = val_df['party'].unique()
val_df['party_encoded'] = val_df['party'].apply(lambda x: label_encoder.transform([x])[0] if x not in unknown_party_val else -1)
unknown_state_val = val_df['state info'].unique()
val_df['state_encoded'] = val_df['state info'].apply(lambda x: label_encoder.transform([x])[0] if x not in unknown_state_val else -1)

unknown_subject_test = test_df['subject'].unique()
test_df['subject_encoded'] = test_df['subject'].apply(lambda x: label_encoder.transform([x])[0] if x not in unknown_subject_test else -1)
unknown_context_test = test_df['context'].unique()
test_df['context_encoded'] = test_df['context'].apply(lambda x: label_encoder.transform([x])[0] if x not in unknown_context_test else -1)
unknown_speakers_test = test_df['speaker'].unique()
test_df['speaker_encoded'] = test_df['speaker'].apply(lambda x: label_encoder.transform([x])[0] if x not in unknown_speakers_test else -1)
unknown_party_test = test_df['party'].unique()
test_df['party_encoded'] = test_df['party'].apply(lambda x: label_encoder.transform([x])[0] if x not in unknown_party_test else -1)
unknown_state_test = test_df['state info'].unique()
test_df['state_encoded'] = test_df['state info'].apply(lambda x: label_encoder.transform([x])[0] if x not in unknown_state_test else -1)

def combine_features(df):

    speaker_encoded_tensor = torch.tensor(df['speaker_encoded'].values.reshape(-1, 1), dtype=torch.float32)
    party_encoded_tensor = torch.tensor(df['party_encoded'].values.reshape(-1, 1), dtype=torch.float32)
    state_encoded_tensor = torch.tensor(df['state_encoded'].values.reshape(-1, 1), dtype=torch.float32)
    subject_encoded_tensor = torch.tensor(df['subject_encoded'].values.reshape(-1, 1), dtype=torch.float32)
    context_encoded_tensor = torch.tensor(df['context_encoded'].values.reshape(-1, 1), dtype=torch.float32)

    return torch.tensor(
        torch.cat([
            speaker_encoded_tensor,
            party_encoded_tensor,
            state_encoded_tensor,
            subject_encoded_tensor,
            context_encoded_tensor
        ], dim=1),
        dtype=torch.float32
    )

X_text_train = torch.tensor(text_tfidf_vectorizer.fit_transform(train_df['text']).toarray(), dtype=torch.float32)
X_text_val = torch.tensor(text_tfidf_vectorizer.transform(val_df['text']).toarray(), dtype=torch.float32)
X_text_test = torch.tensor(text_tfidf_vectorizer.transform(test_df['text']).toarray(), dtype=torch.float32)

# Combine features
X_train_combined = torch.tensor(
    torch.cat([X_text_train, combine_features(train_df)], dim=1),
    dtype=torch.float32
)

X_val_combined = torch.tensor(
    torch.cat([X_text_val, combine_features(val_df)], dim=1),
    dtype=torch.float32
)

X_test_combined = torch.tensor(
    torch.cat([X_text_test, combine_features(test_df)], dim=1),
    dtype=torch.float32
)

# Output labels
label_encoder = LabelEncoder()
y_train = torch.tensor(label_encoder.fit_transform(train_df['label']), dtype=torch.long)
y_val = torch.tensor(label_encoder.transform(val_df['label']), dtype=torch.long)
y_test = torch.tensor(label_encoder.transform(test_df['label']), dtype=torch.long)


  return torch.tensor(
  X_train_combined = torch.tensor(
  X_val_combined = torch.tensor(
  X_test_combined = torch.tensor(


# Model

In [None]:
class HybridModel(nn.Module):
    def __init__(self, text_input_size, other_input_size, hidden_size, lstm_hidden_size, output_size, dropout_rate=0.5):
        super(HybridModel, self).__init__()
        # LSTM layer for text features
        self.lstm = nn.LSTM(text_input_size, lstm_hidden_size, bidirectional=True, batch_first=True)
        # Linear layers for other features
        self.fc_other = nn.Sequential(
            nn.Linear(other_input_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        )
        # Final linear layer for classification
        self.fc = nn.Linear(hidden_size + lstm_hidden_size * 2, output_size)

    def forward(self, x_text, x_other):
        # LSTM forward pass for text features
        lstm_out, _ = self.lstm(x_text)
        lstm_out = lstm_out[:, -1, :]
        # Linear layers forward pass for other features
        other_out = self.fc_other(x_other)
        # Concatenate text and other features
        combined = torch.cat([lstm_out, other_out], dim=1)

        # Final classification layer
        output = self.fc(combined)

        return output


# Model Instatiation and Training

In [None]:
text_input_size = X_text_train.shape[1]
other_input_size = X_train_combined.shape[1]
hidden_size = 64
lstm_hidden_size = 64
output_size = len(label_encoder.classes_)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Instantiate the model
model_Hybrid_Tfidf = HybridModel(text_input_size, other_input_size, hidden_size, lstm_hidden_size, output_size).to(device)
torch.save(model_Hybrid_Tfidf,'model_Hybrid_Tfidf.pth')

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_Hybrid_Tfidf.parameters(), lr=0.001)

# Combine text and other features for training, validation, and test sets
X_train_text = X_text_train.unsqueeze(1)
X_val_text = X_text_val.unsqueeze(1)
X_test_text = X_text_test.unsqueeze(1)

# Training loop
num_epochs = 10
batch_size = 32

train_dataset = TensorDataset(X_train_text, X_train_combined, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

for epoch in range(num_epochs):
    model_Hybrid_Tfidf.train()
    for batch_text, batch_other, labels in train_loader:
        optimizer.zero_grad()
        output = model_Hybrid_Tfidf(batch_text.to(device), batch_other.to(device)).to(device)
        loss = criterion(output, labels.to(device))
        loss.backward()
        optimizer.step()


# Validation and Testing

In [None]:
# Validation
model_Hybrid_Tfidf.eval()
with torch.no_grad():
    val_output = model_Hybrid_Tfidf(X_val_text.to(device), X_val_combined.to(device)).to(device)
    val_predictions = torch.argmax(val_output, dim=1)
    val_accuracy = accuracy_score(y_val.cpu().numpy(), val_predictions.cpu().numpy())

print(f"Validation Accuracy: {val_accuracy * 100:.2f}%")

# Test
model_Hybrid_Tfidf.eval()
with torch.no_grad():
    test_output = model_Hybrid_Tfidf(X_test_text.to(device), X_test_combined.to(device)).to(device)
    test_predictions = torch.argmax(test_output, dim=1)
    test_accuracy = accuracy_score(y_test.cpu().numpy(), test_predictions.cpu().numpy())

print(f"Test Accuracy: {test_accuracy * 100:.2f}%")


Validation Accuracy: 25.94%
Test Accuracy: 24.40%


In [None]:
import joblib

# Load the CountVectorizer
loaded_vectorizer = joblib.load('count_vectorizer.pkl')

# New data to transform
new_corpus = ["A new sentence to transform."]

# Transform the new data using the loaded CountVectorizer
X_new = loaded_vectorizer.transform(new_corpus)

# X_new is now a sparse matrix representing the new data in the vectorized form
print(X_new.toarray())


[[0 0 0 ... 0 0 0]]


In [None]:
import torch

# Load the model
loaded_model = torch.load('model_LSTM_CountVectorizer.pth')

# Set the model to evaluation mode (if needed)
loaded_model.eval()

# New data for prediction
new_data = torch.tensor(X_new.toarray(),dtype=torch.float)  # Replace with your actual input data

# Make predictions
with torch.no_grad():
    predictions = loaded_model(new_data.unsqueeze(1))

# Display the predictions
print(torch.argmax(predictions, dim=1))


tensor([3])
