In [None]:
pip install transformers
pip install keras-tuner
pip install tensorflow-addons

# Import required packages
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers import Bidirectional, LSTM
from tensorflow.keras.callbacks import EarlyStopping, Callback
from sklearn.model_selection import KFold
from transformers import BertTokenizer, BertModel
import torch
import numpy as np
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from kerastuner import RandomSearch
import tensorflow_addons as tfa
from tensorflow.keras import regularizers


# Define the file paths
train_path = '/content/drive/MyDrive/LIAR Dataset/train.tsv'
valid_path = '/content/drive/MyDrive/LIAR Dataset/valid.tsv'
test_path = '/content/drive/MyDrive/LIAR Dataset/test.tsv'

# Load the datasets
liar_train_df = pd.read_csv(train_path, delimiter='\t', header=None)
liar_valid_df = pd.read_csv(valid_path, delimiter='\t', header=None)
liar_test_df = pd.read_csv(test_path, delimiter='\t', header=None)

# Rename the columns for easier reference
column_names = [
    'JSON_ID', 'Truth_Label', 'Statement_Text', 'Topic',
    'Speaker_Name', 'Speaker_Title', 'State_Info', 'Party_Affiliation',
    'Total_Credit_History_Count', 'False_Counts', 'Half_True_Counts',
    'Mostly_True_Counts', 'Pants_On_Fire_Counts', 'Context'
]
liar_train_df.columns = column_names
liar_valid_df.columns = column_names
liar_test_df.columns = column_names

# Data Visualization Libraries
import matplotlib.pyplot as plt
import seaborn as sns

# Plot the distribution of truth labels
plt.figure(figsize=(10, 6))
sns.countplot(x='Truth_Label', data=liar_train_df)
plt.title('Distribution of Truth Labels in Training Dataset')
plt.xlabel('Truth Label')
plt.ylabel('Count')
plt.show()

# Identify numerical and categorical columns
numerical_cols = liar_train_df.select_dtypes(include=['float64', 'int64']).columns
categorical_cols = liar_train_df.select_dtypes(include=['object']).columns

# Impute missing values in numerical columns with the column median
for col in numerical_cols:
    median_value = liar_train_df[col].median()
    liar_train_df[col].fillna(median_value, inplace=True)
    liar_valid_df[col].fillna(median_value, inplace=True)
    liar_test_df[col].fillna(median_value, inplace=True)

# Impute missing values in categorical columns with the column mode
for col in categorical_cols:
    mode_value = liar_train_df[col].mode()[0]
    liar_train_df[col].fillna(mode_value, inplace=True)
    liar_valid_df[col].fillna(mode_value, inplace=True)
    liar_test_df[col].fillna(mode_value, inplace=True)

# Selecting the features for model training
textual_feature = 'Statement_Text'
numerical_features = ['Total_Credit_History_Count', 'False_Counts', 'Half_True_Counts', 'Mostly_True_Counts', 'Pants_On_Fire_Counts']
categorical_feature = 'Party_Affiliation'

# Subset the dataframes to include only the selected features
train_features_df = liar_train_df[[textual_feature] + numerical_features + [categorical_feature]]
valid_features_df = liar_valid_df[[textual_feature] + numerical_features + [categorical_feature]]
test_features_df = liar_test_df[[textual_feature] + numerical_features + [categorical_feature]]

# Initialize the BERT tokenizer and model
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')

# Define batch size and maximum sequence length for BERT
batch_size = 100
max_length = 256

# Function to get BERT embeddings for a batch of text
def get_bert_embeddings_for_batch(text_batch):
    inputs = tokenizer(text_batch, return_tensors="pt", padding=True, truncation=True, max_length=max_length)
    outputs = model(**inputs)
    embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().detach().cpu().numpy()
    return embeddings

# Initialize lists to hold the BERT embeddings for the training, validation, and test sets
bert_embeddings_train = []
bert_embeddings_valid = []
bert_embeddings_test = []

# Loop through the training DataFrame in batches to get embeddings
for i in range(0, len(train_features_df), batch_size):
    text_batch = train_features_df['Statement_Text'].iloc[i:i+batch_size].tolist()
    embeddings_batch = get_bert_embeddings_for_batch(text_batch)
    bert_embeddings_train.extend(embeddings_batch)

# Loop through the validation DataFrame in batches to get embeddings
for i in range(0, len(valid_features_df), batch_size):
    text_batch = valid_features_df['Statement_Text'].iloc[i:i+batch_size].tolist()
    embeddings_batch = get_bert_embeddings_for_batch(text_batch)
    bert_embeddings_valid.extend(embeddings_batch)

# Loop through the test DataFrame in batches to get embeddings
for i in range(0, len(test_features_df), batch_size):
    text_batch = test_features_df['Statement_Text'].iloc[i:i+batch_size].tolist()
    embeddings_batch = get_bert_embeddings_for_batch(text_batch)
    bert_embeddings_test.extend(embeddings_batch)

# Convert lists of embeddings to NumPy arrays
bert_embeddings_train = np.array(bert_embeddings_train)
bert_embeddings_valid = np.array(bert_embeddings_valid)
bert_embeddings_test = np.array(bert_embeddings_test)

# Use deep copy to avoid SettingWithCopyWarning
train_features_df_copy = train_features_df.copy()
valid_features_df_copy = valid_features_df.copy()
test_features_df_copy = test_features_df.copy()

# Scale numerical features
scaler = StandardScaler()
scaler.fit(train_features_df_copy[numerical_features])

train_features_df_copy.loc[:, numerical_features] = scaler.transform(train_features_df_copy[numerical_features])
valid_features_df_copy.loc[:, numerical_features] = scaler.transform(valid_features_df_copy[numerical_features])
test_features_df_copy.loc[:, numerical_features] = scaler.transform(test_features_df_copy[numerical_features])

# One-hot encode categorical features
encoder = OneHotEncoder(sparse=False, handle_unknown='ignore')
encoder.fit(train_features_df_copy[[categorical_feature]])

train_categorical_encoded = encoder.transform(train_features_df_copy[[categorical_feature]])
valid_categorical_encoded = encoder.transform(valid_features_df_copy[[categorical_feature]])
test_categorical_encoded = encoder.transform(test_features_df_copy[[categorical_feature]])

# Convert to DataFrame
train_categorical_df = pd.DataFrame(train_categorical_encoded, columns=encoder.get_feature_names_out([categorical_feature]))
valid_categorical_df = pd.DataFrame(valid_categorical_encoded, columns=encoder.get_feature_names_out([categorical_feature]))
test_categorical_df = pd.DataFrame(test_categorical_encoded, columns=encoder.get_feature_names_out([categorical_feature]))


  # Concatenate BERT embeddings, scaled numerical features, and one-hot encoded categorical features
  train_final_features = np.hstack([bert_embeddings_train, train_features_df_copy[numerical_features].values, train_categorical_df.values])
  valid_final_features = np.hstack([bert_embeddings_valid, valid_features_df_copy[numerical_features].values, valid_categorical_df.values])
  test_final_features = np.hstack([bert_embeddings_test, test_features_df_copy[numerical_features].values, test_categorical_df.values])

  # Display the shape of the concatenated feature sets to confirm the operation
  print(f"Shape of final training features: {train_final_features.shape}")
  print(f"Shape of final validation features: {valid_final_features.shape}")
  print(f"Shape of final test features: {test_final_features.shape}")

# Updated Label Mapping
label_mapping = {
    'true': 1,
    'mostly-true': 0.7,
    'half-true': 0.5,
    'barely-true': 0.2,
    'false': 0,
    'pants-fire': -1
}

# Apply the new mapping to the DataFrame
liar_train_df['Truth_Label'] = liar_train_df['Truth_Label'].map(label_mapping)
liar_valid_df['Truth_Label'] = liar_valid_df['Truth_Label'].map(label_mapping)
liar_test_df['Truth_Label'] = liar_test_df['Truth_Label'].map(label_mapping)

# Update the labels arrays
train_labels = liar_train_df['Truth_Label'].values
valid_labels = liar_valid_df['Truth_Label'].values
test_labels = liar_test_df['Truth_Label'].values

# Keras Tuner Setup

## Define the feature size (number of columns) from the training features
feature_size = train_final_features.shape[1]

# Function to build model for Keras Tuner
def build_model(hp):
    model = tf.keras.Sequential()
    model.add(layers.Input(shape=(feature_size,)))
    model.add(layers.Dense(units=hp.Int('units', min_value=128, max_value=512, step=32), activation='relu'))
    model.add(layers.Dropout(rate=hp.Float('dropout', min_value=0.0, max_value=0.5, step=0.1)))
    model.add(layers.Reshape((hp.Int('units', min_value=128, max_value=512, step=32), 1)))
    model.add(Bidirectional(LSTM(hp.Int('lstm_units', min_value=32, max_value=128, step=32))))
    model.add(layers.Dense(1))
    optimizer = tfa.optimizers.AdamW(learning_rate=hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='LOG'), weight_decay=1e-5)
    model.compile(optimizer=optimizer, loss='mean_squared_error', metrics=['mae'])
    return model

tuner = RandomSearch(build_model, objective='val_loss', max_trials=5, executions_per_trial=2)
tuner.search(train_final_features, train_labels, epochs=5, validation_data=(valid_final_features, valid_labels))

# Get the best hyperparameters
best_hp = tuner.get_best_hyperparameters()[0]

# K-Fold Cross-Validation

n_splits = 5
kf = KFold(n_splits=n_splits)

val_loss_scores = []
val_mae_scores = []

for train_index, val_index in kf.split(train_final_features):
    X_train_fold, X_val_fold = train_final_features[train_index], train_final_features[val_index]
    y_train_fold, y_val_fold = train_labels[train_index], train_labels[val_index]

    model = build_model(best_hp)

    early_stop = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

    model.fit(
        X_train_fold, y_train_fold,
        epochs=20,
        batch_size=64,
        validation_data=(X_val_fold, y_val_fold),
        callbacks=[early_stop]
    )

    val_loss, val_mae = model.evaluate(X_val_fold, y_val_fold)
    val_loss_scores.append(val_loss)
    val_mae_scores.append(val_mae)

avg_val_loss = np.mean(val_loss_scores)
avg_val_mae = np.mean(val_mae_scores)

print(f"Average Validation Loss: {avg_val_loss}")
print(f"Average Validation MAE: {avg_val_mae}")

# Plot Training & Validation Loss Values vs Epoch
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss vs Epoch')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper right')
plt.show()

class TestCallback(Callback):
    def __init__(self, test_data):
        self.test_data = test_data
        self.test_mae = []
        self.test_loss = []

    def on_epoch_end(self, epoch, logs=None):
        x, y = self.test_data
        loss, mae = self.model.evaluate(x, y, verbose=0)
        self.test_mae.append(mae)
        self.test_loss.append(loss)
        print(f'Test MAE: {mae}, Test Loss: {loss}')

# Initialize EarlyStopping
early_stop = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# Initialize custom TestCallback
test_callback = TestCallback((test_final_features, test_labels))

# Train the final model
final_model = build_model(best_hp)
history = final_model.fit(
    train_final_features, train_labels,
    epochs=20,
    batch_size=64,
    validation_data=(valid_final_features, valid_labels),
    callbacks=[early_stop, test_callback]
)

# Evaluate the model on the validation set
val_loss, val_mae = final_model.evaluate(valid_final_features, valid_labels, batch_size=64)
print(f"Validation Loss: {val_loss}")
print(f"Validation MAE: {val_mae}")

# Evaluate the model on the test set
test_loss, test_mae = final_model.evaluate(test_final_features, test_labels, batch_size=64)
print(f"Test Loss: {test_loss}")
print(f"Test MAE: {test_mae}")

plt.figure(figsize=(10, 5))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

plt.figure(figsize=(10, 5))
plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Training and Validation MAE over Epochs')
plt.xlabel('Epoch')
plt.ylabel('MAE')
plt.legend()
plt.show()