In [None]:
!pip install keras-self-attention
!pip install scikeras



In [None]:
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
import random as python_random
# NLP Libraries
import nltk
import re
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer


# Sklearn
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score, classification_report

# TensorFlow and Keras
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import (Input, Dense, LSTM, Bidirectional, Embedding,
                                     Dropout, GlobalAveragePooling2D, GlobalMaxPool1D)
from tensorflow.keras.layers import Layer, Concatenate

from tensorflow.keras.preprocessing.text import one_hot
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.utils import to_categorical

# Image Processing
from PIL import Image
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Scikit-learn Keras Wrapper
from scikeras.wrappers import KerasClassifier
random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)
os.environ['PYTHONHASHSEED'] = str(42)
os.environ['TF_DETERMINISTIC_OPS'] = '1'


In [None]:
!pip install -U path.py
import os
from path import Path
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def reset_random_seeds():
   np.random.seed(42)
   python_random.seed(42)
   tf.random.set_seed(42)
reset_random_seeds()

# Loading Text data

In [None]:
path_lie = Path("/content/drive/MyDrive/Tien-297/Real-life_Deception_Detection_2016/Transcription/Deceptive")
files_lie = [file for file in os.listdir(path_lie) if not file.startswith('.')]
text = []
for file in files_lie:
    with open(path_lie+"/"+file, 'r') as f:
        text.append(f.read().replace('\r\n', ''))
df = pd.DataFrame(text, columns = ['Speech'])
df['label'] = 'Lie'

path_truth = Path("/content/drive/MyDrive/Tien-297/Real-life_Deception_Detection_2016/Transcription/Truthful")
files_truth = [file for file in os.listdir(path_truth) if not file.startswith('.')]

for file in files_truth:
    with open(path_truth+"/"+file, 'r', encoding="utf8") as f:
        text = [f.read().replace('\r\n', ''),'Truth']
        df.loc[len(df.index)] = text

# Features and labels
x = df.drop('label', axis=1)
y = df['label'].map({'Truth':0, 'Lie':1})
var = x.copy()
var.reset_index(inplace=True)
stemmer = PorterStemmer()
corpus = []
# Instead of preprocessing, use the raw text directly
#corpus = var['Speech'].tolist()
for i in range(0, len(var)):
    #review = re.sub('[^a-zA-Z]', ' ', var['Speech'][i])
    # Use the raw text without removing non-alphabetic characters
    review = var['Speech'][i]

    # Convert to lowercase
    review = review.lower()

    # Split into individual words
    review = review.split()

    # Apply stemming to each word
    review = [stemmer.stem(word) for word in review]

    # Join the words back into a sentence
    review = ' '.join(review)

    # Append the processed review to the corpus
    corpus.append(review)
# Find the sentence with the highest number of words
max_sentence = max(corpus, key=lambda x: len(x.split()))
print(f"The sentence '{max_sentence}' has the highest number of words ({len(max_sentence.split())}).")

# Convert sentences to one-hot encoding
vocab_size = 5000
encoding = [one_hot(words, vocab_size) for words in corpus]

# Pad the sequences to ensure uniform input length
emb_doc = pad_sequences(encoding, padding='pre', maxlen=221)

# Convert to NumPy arrays for further processing
text_features = np.array(emb_doc)
text_labels = np.array(y)


The sentence 'i do. i was, uh... in the office, and i got a call ... uh, mayb ... close to nine o' clock, i don't remember, somewher between eight and nine, from rusty. he said, uh ... someon wa just on the side of the house, and he wa sort of not complet coherent, and said ... someon was, like, sleep on the side of the house, and he call the police, and he was... oh and then he had to go realli quickli to talk to the police, and then he end up call me back, and told me the whole stori of what had happened, which wa that, um... he had put ian in the car as he doe everi day around that time, and ian' car seat is on the right passeng side in the back, and as he walk around he smell gas, um... as he walk -- 'caus he sort of exit the garag a littl bit to get to hi side of the car, he smell gas, and, uh... our ga meter and pipe is on the side of the left side of the hous if you'r look at the front. so he walk over to that spot, it' near our air conditioners, and he said that there wa a man 

# Loading Audio data

In [None]:
# Define the directories for your two classes
dir_deceptive = Path("/content/drive/MyDrive/Tien-297/Real-life_Deception_Detection_2016/Images/Deceptive_v1/")
dir_truthful = Path("/content/drive/MyDrive/Tien-297/Real-life_Deception_Detection_2016/Images/Truthful_v1")

# Initialize lists to store the images and labels
images = []
labels = []  # 0 for 'Deceptive', 1 for 'Truthful'

# Function to load images from a directory and assign labels
def load_images_from_directory(directory, label):
    for filename in os.listdir(directory):
        if filename.endswith('.png'):  # or other file formats as needed
            img_path = os.path.join(directory, filename)
            img = Image.open(img_path).convert('RGB')  # Convert to RGB
            img = img.resize((224, 224))
            images.append(np.array(img))
            labels.append(label)

# Load 'Deceptive' images
load_images_from_directory(dir_deceptive, 0)

# Load 'Truthful' images
load_images_from_directory(dir_truthful, 1)

# Convert to numpy arrays and preprocess
audio_features = np.array(images)
audio_labels = np.array(labels)


**LATE FUSION**

In [None]:
class LinearW(Layer):
    def __init__(self, num_inputs, **kwargs):
        super(LinearW, self).__init__(**kwargs)  # Pass additional keyword arguments to the superclass
        self.num_inputs = num_inputs

    def build(self, input_shape):
        # Initialize weights to combine the inputs
        self.W = self.add_weight(
            name="fusion_weights",  # give a name to the weights
            shape=(1, 1, self.num_inputs),
            initializer='uniform',
            trainable=True
        )

    def call(self, inputs):
        # Ensure that 'inputs' is a list of tensors
        inputs = [tf.expand_dims(i, -1) for i in inputs]
        inputs = Concatenate(axis=-1)(inputs)
        weights = tf.nn.softmax(self.W, axis=-1)  # Apply softmax to learn which inputs are more important
        return tf.reduce_sum(weights * inputs, axis=-1)  # Weighted sum of inputs


# Audio Model Modification
def create_audio_model(input_tensor):
    # Ensure ResNet50 is set up correctly for the input tensor
    base_model = ResNet50(weights='imagenet', include_top=False, input_tensor=input_tensor)
    for layer in base_model.layers[:-20]:
        layer.trainable = False
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dropout(0.5)(x)
    x = Dense(128, activation='relu')(x)  # Output modified for feature fusion
    return x

def create_text_model(input_tensor):
    # The input_tensor is already defined with its shape when calling this function
    x = Embedding(vocab_size, 40)(input_tensor)  # No need for input_length
    x = Bidirectional(LSTM(100))(x)
    x = Dense(128, activation='relu')(x)
    return x

In [None]:
n_splits = 5
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
accuracies = []
f1_scores = []
weights_across_folds = []

fold_no = 1
for train_index, test_index in kf.split(audio_features):
    # Split data for this fold
    print('--------------- Fold {}------------'.format(fold_no))
    audio_train, audio_test = audio_features[train_index], audio_features[test_index]
    text_train, text_test = text_features[train_index], text_features[test_index]
    y_train, y_test = audio_labels[train_index], audio_labels[test_index]  # Assuming audio_labels holds the target values

    # Define input layers for audio and text data inside the loop to reset weights
    input_audio = Input(shape=(224, 224, 3))  # Audio spectrogram dimensions
    input_text = Input(shape=(221,))  # Length of sequence for text

    # Create outputs from both models
    audio_output = create_audio_model(input_audio)
    text_output = create_text_model(input_text)

    # Fusion Layer with LinearW initialized to handle 2 inputs
    combined_output = LinearW(num_inputs=2, name='fusion_layer')([audio_output, text_output])

    # Final dense layer for classification
    final_output = Dense(1, activation='sigmoid')(combined_output)

    # Compile the fusion model inside the loop
    fusion_model = Model(inputs=[input_audio, input_text], outputs=final_output)
    fusion_model.compile(optimizer=Adam(learning_rate = 0.001), loss='binary_crossentropy', metrics=['accuracy'])

    # Train the model
    history = fusion_model.fit(
        [audio_train, text_train], y_train,
        validation_data=([audio_test, text_test], y_test),
        epochs=100,
        batch_size=20,
        callbacks=[EarlyStopping(monitor='val_accuracy', patience=5, restore_best_weights=True)],
        verbose=0
    )
    # After training, fetch weights
    weights = fusion_model.get_layer('fusion_layer').get_weights()[0]
    weights_normalized = tf.nn.softmax(weights).numpy()
    print('Weight: ', weights_normalized)
    weights_across_folds.append(weights_normalized)

    # Evaluate model on test data
    test_loss, test_accuracy = fusion_model.evaluate([audio_test, text_test], y_test)
    print(f"Test Accuracy: {test_accuracy}")

    # Predictions and classification report
    y_pred = fusion_model.predict([audio_test, text_test])
    y_pred = (y_pred > 0.5).astype(int)
    accuracies.append(test_accuracy)
    f1 = f1_score(y_test, y_pred)
    f1_scores.append(f1)
    print(f"Test F1 Score: {f1}")

    fold_no += 1
    print('---------------------------')
# Calculate and print average metrics
average_accuracy = np.mean(accuracies)
average_f1 = np.mean(f1_scores)
print(f'Average Test Accuracy: {average_accuracy}')
print(f'Average Test F1 Score: {average_f1}')
# Analyze how weights vary across folds
weights_across_folds = np.array(weights_across_folds)
mean_weights = np.mean(weights_across_folds, axis=0)
print("Mean weights across folds:", mean_weights)

--------------- Fold 1------------
Weight:  [[[0.49265066 0.5073494 ]]]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 122ms/step - accuracy: 1.0000 - loss: 0.0020
Test Accuracy: 1.0
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
Test F1 Score: 1.0
---------------------------
--------------- Fold 2------------
Weight:  [[[0.50001985 0.49998015]]]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 118ms/step - accuracy: 0.9583 - loss: 0.0966
Test Accuracy: 0.9583333134651184
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
Test F1 Score: 0.9565217391304348
---------------------------
--------------- Fold 3------------
Weight:  [[[0.5136772  0.48632288]]]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 122ms/step - accuracy: 1.0000 - loss: 5.4715e-36
Test Accuracy: 1.0
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
Test F1 Score: 1.0
---------------------------
--------------- Fold 