# Library

In [None]:
import pandas as pd
import re
import numpy as np
import nltk
import spacy
import random
import pickle

from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from textblob import TextBlob
from nltk.stem import WordNetLemmatizer
from nltk.tag import pos_tag
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, accuracy_score

from joblib import dump, load
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
from wordcloud import WordCloud

import tensorflow as tf
from keras.models import Sequential
from keras.layers import LSTM, Dense, Embedding, SpatialDropout1D, Bidirectional

from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import load_model

from gensim.models import Word2Vec

nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('punkt')

%load_ext autotime

In [None]:
MODEL_PATH = os.path.join(GOOGLE_DRIVE_PATH, 'Models')

# Tools

In [None]:
def _lowercase(text):
    """
      Lowercase text
    """

    # Count the number of uppercase
    uppercase_count = sum(1 for s in text if s.isupper())

    return text.lower(), uppercase_count

In [None]:
def _remove_links(text):
    """
      Remove web links
    """

    # Count the number of links
    links = re.findall(r'https?://\S+', text)

    # Remove all links
    clean_text = re.sub(r'https?://\S+', '', text)

    return clean_text, len(links)

In [None]:
def _remove_noise(text):
    """
      Remove all special characters and number
    """

    # Count the number of digits in the text
    num_digits = len(re.findall(r'\d', text))

    # Count the number of special characters in the text
    num_special_chars = len(re.findall(r'[^a-zA-Z0-9\s]', text))

    # Remove escape characters from the text
    clean_text = re.sub(r'\\[rntbf]', '', text)

    # Remove all non-alphabetic characters from the text
    clean_text = re.sub(re.compile(r'[^a-zA-Z\s]'), '', clean_text)

    return clean_text, num_digits, num_special_chars

In [None]:
def _wordtokenize(sent):
    """
      Get word tokens from sentence
    """

    return word_tokenize(sent)

In [None]:
def _remove_stopwords(text):
    """
      Filter out and count stopwords in each sentence
    """

    # Set of English stopwords from NLTK
    stop_words = set(stopwords.words('english'))
    new_sentence = []
    stopwords_count = 0

    # Iterate through each token after splitting
    for token in text.split():

        # Increment count if token is a stopword
        if token in stop_words:
            stopwords_count += 1
        else:
            # Combine non-stopword to sentence
            new_sentence.append(token)

    return " ".join(new_sentence), stopwords_count

In [None]:
def _lemmatizer(text):
    """
      Lemmatize tokens
    """

    # Initialize WordNet Lemmatizer
    lemmatizer = WordNetLemmatizer()

    # Tokenize the input text
    tokens = word_tokenize(text)

    # Lemmatize each token
    lemmatized_tokens = [lemmatizer.lemmatize(token) for token in tokens]

    # Join the lemmatized tokens into a sentence
    lemmatized_sentence = " ".join(lemmatized_tokens)

    return lemmatized_sentence

In [None]:
def _spell_check(text):
    """
      Correct misspelled words
    """

    # Create a TextBlob object
    blob = TextBlob(text)

    # Correct misspelled words
    corrected_text = blob.correct()

    # Calculate the number of misspelled words
    num_misspelled = len(blob.words) - len(corrected_text.words)

    return str(corrected_text), num_misspelled

In [None]:
def _ner(text):
    """
      Perform  Named Entity Recognition
    """

    # Load the English model
    ner = spacy.load("en_core_web_sm")

    # Process using NER model
    doc = ner(text)

    # Extract named entities and their labels
    ner_features = {ent.text: ent.label_ for ent in doc.ents}

    return ner_features

In [None]:
def _pos_tagging(tokens):
    """
      Perform Part of Speech tagging
    """

    # Perform POS tagging on tokens
    pos_tags = pos_tag(tokens)

    # Store POS tags for each token
    pos_features = {word: pos for word, pos in pos_tags}

    return pos_features

In [None]:
def _import_data():
    """
      Import dataset
    """

    # Read all source files
    fin_df = pd.read_json(path_or_buf=f'{GOOGLE_DRIVE_PATH}/source/finance.jsonl', lines=True)
    med_df = pd.read_json(path_or_buf=f'{GOOGLE_DRIVE_PATH}/source/medicine.jsonl', lines=True)
    openqa_df = pd.read_json(path_or_buf=f'{GOOGLE_DRIVE_PATH}/source/open_qa.jsonl', lines=True)
    reddit_df = pd.read_json(path_or_buf=f'{GOOGLE_DRIVE_PATH}/source/reddit_eli5.jsonl', lines=True)
    wiki_df = pd.read_json(path_or_buf=f'{GOOGLE_DRIVE_PATH}/source/wiki_csai.jsonl', lines=True)

    # Assign source names
    fin_df['type'] = 'finance'
    med_df['type'] = 'medicine'
    openqa_df['type'] = 'open_qa'
    reddit_df['type'] = 'reddit_eli5'
    wiki_df['type'] = 'wiki_csai'

    # Combine all datasets
    df = pd.concat([fin_df, med_df, openqa_df, reddit_df, wiki_df], ignore_index=True)

    # Transform answer columns of human and AI to be in single column
    df_human = pd.DataFrame(df[['human_answers', 'type']].copy())
    df_human.columns = ['answer', 'type']
    df_human['ai-generated'] = 0

    df_gpt = pd.DataFrame(df[['chatgpt_answers', 'type']].copy())
    df_gpt.columns = ['answer', 'type']
    df_gpt['ai-generated'] = 1

    df = pd.concat([df_human, df_gpt])

    # Shuffle records to reduce bias
    df = df.sample(frac=1, random_state=42).reset_index(drop=True)

    # Split multiple answers into each record
    df = df.explode('answer')

    # Drop null answer
    df.dropna(subset=['answer'], inplace=True)

    # Reset Index
    df.reset_index(drop=True, inplace=True)

    return df

In [None]:
def _split_test_set(df, test_size=0.2, validation_size=0.2):
    """
      Split to train-validation-test set
    """

    X = df['answer']
    y = df[['ai-generated', 'type']]

    validation_pct = validation_size/(1-test_size)

    # Split into train and test set
    X_train, X_test, y_train, y_test = train_test_split(X,
                                                        y, test_size=test_size, random_state=42)


    # Split train set into valudation set
    X_train, X_val, y_train, y_val = train_test_split(X_train,
                                                      y_train, test_size=validation_pct, random_state=42)


    return X_train, X_val, y_train, y_val, X_test, y_test

In [None]:
def count_words(text):
    """
      Count number of words
    """

    return len(text.split())

In [None]:
def _data_cleaning(df):
    """
      Data Preparation, converting to lower case, removing all links, special characters, stopwords, and perform Lemmatization
    """

    # Convert to lowercase
    df['answer'], df['uppercase_count'] = zip(*df['answer'].apply(_lowercase))

    # Remove all links
    df['answer'], df['link_count'] = zip(*df['answer'].apply(_remove_links))

    # Count number of words
    df['tokens_count'] = df['answer'].apply(count_words)

    # Remove all special characters
    df['answer'], df['num_digits'], df['num_special_chars'] = zip(*df['answer'].apply(_remove_noise))

    # Remove stopwords
    # df['answer'], df['stopwords_count'] = zip(*df['answer'].apply(_remove_stopwords))

    # Lemmatization
    df['answer'] = df['answer'].apply(_lemmatizer)

    return df

In [None]:
def _load_model(fn, model_type):
    """
      Load Models
    """

    if model_type == 'Baseline':
        with open(f'{MODEL_PATH}/{fn}', 'rb') as f:
            clf = pickle.load(f)

    if not model_type in ('BERT', 'LSTM'):
        clf = load(f'{MODEL_PATH}/{fn}')

    else:
      clf = load_model(f'{MODEL_PATH}/{fn}')

    return clf

In [None]:
def _predict(clf, X, y, model_type):
    """
      Get predicted labels
    """

    if model_type == 'Baseline':
        y_pred = clf().predict(X, y)

    elif not model_type in ('BERT', 'LSTM'):
        y_pred = clf.predict(X)

    elif model_type == 'BERT':
        y_pred_prob = clf.predict(X)

        # Get the predicted labels based on the highest probability
        y_pred = y_pred_prob.argmax(axis=1)

    elif model_type == 'LSTM':
        y_pred_prob = clf.predict(X)

        # Convert probabilities to binary labels using a threshold of 0.5
        y_pred = (y_pred_prob > 0.5).astype(int)

    return y_pred

In [None]:
def _evaluate_model(fn, X, y, model_type):
    """
      Load, predict, and evaluate models' performance
    """

    # Load saved models
    clf = _load_model(fn, model_type)

    # Get predicted labels, and generate performance matrix
    y_pred = _predict(clf, X, y, model_type)
    _performance_matrix(y['ai-generated'], y_pred)

In [None]:
def _performance_matrix(y, y_pred, matrix=True):
    """
      Generate confusion matrix and classification report
    """

    # Visualize classification report
    print(classification_report(y, y_pred))

    # Calculate and generate confusion matrix
    if matrix:
        cm = confusion_matrix(y, y_pred)

        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, cmap="Blues", fmt="d", xticklabels=['Human', 'AI-Generated'], yticklabels=['Human', 'AI-Generated'])
        plt.xlabel('Predicted')
        plt.ylabel('Actual')
        plt.title('Confusion Matrix')
        plt.show()

In [None]:
def _history_plot(history, metric_name=['accuracy','val_accuracy', 'loss','val_loss']):
    """
      Plot leanring history (accuracy and loss) for training and validation set through epochs
    """

    # Create subplots
    fig, axs = plt.subplots(2, 1, figsize=(10, 8))

    # Convert history log to dataframe
    history_log_df = pd.DataFrame(history)

    # Plot accuracy
    sns.lineplot(data=history_log_df[[metric_name[0], metric_name[1]]], ax=axs[0])
    axs[0].set_title('Accuracy')

    # Plot loss
    sns.lineplot(data=history_log_df[[metric_name[2], metric_name[3]]], ax=axs[1])
    axs[1].set_title('Loss')

In [None]:
def _evaluate_each_type(y_test, y_pred_test):
    """
      Generate performance matrix for each subset of data from different sources
    """

    # Get unique source
    unique_types = y_test['type'].unique()

    # Iterate to each source
    for type_value in unique_types:
        print('--------------------------------------------------')
        print(f"Evaluating {type_value}")
        print('--------------------------------------------------')

        # Filter only currect source data
        y_test_fil = y_test[y_test['type'] == type_value]['ai-generated']
        y_pred_test = pd.DataFrame(y_pred_test)
        y_pred_test.index = y_test.index
        y_pred_fil = y_pred_test[y_test['type'] == type_value]

        # Evaluate performance
        _performance_matrix(y_test_fil, y_pred_fil)

In [None]:
def _get_wrong_prediction(y_pred, source=None):
    """
      Print out misprediction examples
    """

    # Get indices of misprediction
    wrong_idx = [i for i, (true_label, pred_label) in enumerate(zip(y_test['ai-generated'], y_pred)) if true_label != pred_label]

    # Get 50 examples of misprediction
    n = 0
    for i, (src, true_label, pred_label) in enumerate(zip(y_test['type'], y_test['ai-generated'], y_pred)):
        if i in wrong_idx:

              # Get examples from specific source
              if source is not None:
                  if src == source:
                      print("Raw Text:", X_test_raw.iloc[i])
                      print("Processed Text:", X_test.iloc[i])
                      print("True Label:", true_label)
                      print("Predicted Label:", pred_label)
                      n+=1

              # Get examples from all sources
              else:
                print("Raw Text:", X_test_raw.iloc[i])
                print("Processed Text:", X_test.iloc[i])
                print("True Label:", true_label)
                print("Predicted Label:", pred_label)
                n+=1

              if n == 50:
                break