In [None]:
# !rm -rf /kaggle/working/*

In [None]:
# !python --version  
# Python 3.10.12

Our solution is a two-layer stacked ensemble model. The first layer consists of three ensembles applied on three different sets of features, which are then fed to a gradient boosting algorithm.  

# Importing Libraries

In [None]:
from typing import Optional, Union
import time

import os
import pickle
import gc
import re

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

from collections import Counter

!pip show pyspellchecker || pip install /kaggle/input/pyspellchecker/pyspellchecker-0.8.0-py3-none-any.whl
from spellchecker import SpellChecker

import spacy

import scipy
from scipy.sparse import spmatrix

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve, auc
from sklearn.feature_extraction.text import TfidfVectorizer

from sklearn.linear_model import LogisticRegression, SGDClassifier
from catboost import CatBoostClassifier
from lightgbm import LGBMClassifier
from sklearn.svm import SVC
from sklearn.ensemble import VotingClassifier
# from sklearn.preprocessing import MinMaxScaler

import seaborn as sns

# Globals

In [None]:
SEED = 1337 # 45090448
VALIDATION_SHARE = 0.4
SAMPLE = True # if True then df.sample 
SAMPLE_RATE = 0.002
CSV_INPUT_PATH = '/kaggle/input/daigt-v2-train-dataset/train_v2_drcat_02.csv'
INPUT_PATH = '/kaggle/input/'
OUTPUT_PATH = '/kaggle/working/'
MAX_ITER = 1200

In [None]:
def print_run_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        print(f'The function {func.__name__} took {time.time() - start_time:.3f} seconds')
        return result
    return wrapper

# Data Preprocessing

In [None]:
def correct_text_spelling(text: str) -> str:
    spell = SpellChecker()
    words = re.findall(r'\b\w+\b', text)
    misspelled = spell.unknown(words)
    corrected_text = text
    for word in misspelled:
        if spell.correction(word):
            corrected_text = corrected_text.replace(word, spell.correction(word))
    return corrected_text

def correct_spelling(df: pd.DataFrame, text_col: str = 'text') -> pd.DataFrame:
    df_ = df.copy()
    df_['corrected'] = df_[text_col].apply(correct_text_spelling)
    return df_['corrected']

# Features

## Text Cleaning and Features

In [None]:
def clean_text(df: pd.DataFrame) -> pd.DataFrame:
    df_ = df.copy()
    
    df_['clean_text'] = (df_['text'].str.replace('\n\n', '') 
                                    .str.replace('\'s', '')
                                    .str.replace('[.,?!:;\'\\\\"]', '', regex=True)
                                    .str.lower()
                        )
    
    return df_['clean_text']

In [None]:
def get_corpus(df: pd.DataFrame, text_col: str = 'clean_text') -> list[str]:
    """ Returns the text within the chosen column as a list. """
    return df[text_col].to_list()

In [None]:
def tfidf(corpus: list[str], vectorizer: Optional[TfidfVectorizer] = None) -> [scipy.sparse.spmatrix | tuple[scipy.sparse.spmatrix, TfidfVectorizer]]:
    if vectorizer:
        M = vectorizer.transform(corpus)
        return M
    else:
        vectorizer = TfidfVectorizer(
                                     lowercase=False,
                                     sublinear_tf=True,
                                     stop_words='english',
                                     ngram_range=(3, 5)
                                    )
        
        M = vectorizer.fit_transform(corpus)
        
        return M, vectorizer 

In [None]:
def pos_feature(df: pd.DataFrame, text_col: str = 'clean_text') -> pd.DataFrame:
    df_ = df.copy()
    
    nlp = spacy.load("en_core_web_sm")

    docs = [nlp(t) for t in df_[text_col]]
    pos_tags = [[token.pos_ for token in doc] for doc in docs]
    
    df_['pos_tags'] = pos_tags
    
    df_['pos_text'] = df_['pos_tags'].apply(lambda x: ' '.join(x))
    
    df_['NOUN_count'] = df_['pos_text'].str.count('NOUN')
    df_['VERB_count'] = df_['pos_text'].str.count('VERB')
    df_['ADJ_count'] = df_['pos_text'].str.count('ADJ')
    df_['ADV_count'] = df_['pos_text'].str.count('ADV')
    df_['ADP_count'] = df_['pos_text'].str.count('ADP')
    df_['PRON_count'] = df_['pos_text'].str.count('PRON')
    df_['PROPN_count'] = df_['pos_text'].str.count('PROPN')
    df_['PUNCT_count'] = df_['pos_text'].str.count('PUNCT')
    df_['AUX_count'] = df_['pos_text'].str.count('AUX')
    df_['NUM_count'] = df_['pos_text'].str.count('NUM')
    df_['X_count'] = df_['pos_text'].str.count('X')
    
    df_ = df_.drop(columns=['pos_tags', 'pos_text'])
    
    return df_
    
def stopwords_feature(df: pd.DataFrame, text_col: str = 'clean_text') -> pd.DataFrame:
    df_ = df.copy()
    
    stop_words = set(stopwords.words('english'))

    def stopwords_counter_and_filter(row):
        words = word_tokenize(row[text_col])
        stopword_counter = sum(1 for w in words if w.lower() in stop_words)
        filtered_text = ' '.join(w for w in words if w.lower() not in stop_words)
        return stopword_counter, filtered_text

    df_[['number_of_stopwords', 'filtered_text']] = df_.apply(stopwords_counter_and_filter, axis=1, result_type='expand')

    return df_

def caps_to_periods_ratio_feature(df: pd.DataFrame) -> pd.DataFrame:
    df_ = df.copy()
    
    parentheses_count = df_['parentheses_count']
    caps_count = df_['text'].str.count(r'[A-Z]')
    df_['caps_to_periods_ratio'] = caps_count / parentheses_count.replace(0, 1) # replace 0 by 1 to avoid deviding by 0
    
    return df_['caps_to_periods_ratio']

## Feature Extraction

In [None]:
def feature_extraction(df: pd.DataFrame, text_col: str = 'clean_text') -> pd.DataFrame:
    """ Extracts numeric features based on the chosen text column. """
    df_ = df.copy()
    
    # text length
    df_['num_of_words'] = (df_[text_col].str.replace('\n\n', '') # removing row
                                .str.count(' ')+1)
    # uniqe words per row
    df_['text_vocab_size'] = df_[text_col].apply(lambda x : len(set(x.split())))
    
    df_['num_of_sentences'] = df_[text_col].str.count('\.')
    
    df_['parentheses_count'] = df_['text'].str.count('\(|\)')
    
    df_['semicolon_count'] = df_['text'].str.count(';')

    df_['hypen_count'] = df_['text'].str.count('-')
    
    df_['dash_count'] = df_['text'].str.count('—')
    
    df_['comma_count'] = df_['text'].str.count(',')
    
    df_['qm_count'] = df_['text'].str.count('\?')
    
    df_['en_count'] = df_['text'].str.count('!')
    
    df_['apostrophe_count'] = df_['text'].str.count('’')
    
    df_['paragraph_count'] = df_['text'].str.count('\n\n')
    
    # extract POS features
    df_ = pos_feature(df_, text_col)
    
    # df_ = stopwords_feature(df_, text_col)
    
    # count typos
    # df_ = typos_count_feature(df_)

    # calculating the ratio between the number of capital letters and periods
    df_['caps_to_periods_ratio'] = caps_to_periods_ratio_feature(df_)
    
    return df_

# Evaluation

In [None]:
def plt_cm(y_true, y_pred, dataset_type: str, model_name: str):
    cm = confusion_matrix(y_true, y_pred)
    fig, ax = plt.subplots(figsize=(3, 2))
    class_labels = ['Human', 'LLM']
    sns.heatmap(cm, annot=True, fmt="d", cmap="RdPu", cbar=True,
                xticklabels=class_labels, yticklabels=class_labels, ax=ax)
    ax.set_xlabel('Predicted Labels')
    ax.set_ylabel('True Labels')
    ax.set_title(f"{dataset_type}\nCM for {model_name}")
    plt.show()
    
def plt_roc_auc(y_true, y_score, dataset_type: str, model_name: str):
    roc_auc_val = roc_auc_score(y_true, y_score)
    print(f"\nROC AUC for {model_name}: {roc_auc_val}\n")

    fpr, tpr, _ = roc_curve(y_true, y_score)
    roc_auc = auc(fpr, tpr)

    fig, ax = plt.subplots(figsize=(4, 3))
    ax.plot(fpr, tpr, color='orchid', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
    ax.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    ax.legend(loc="best")
    ax.set_title(f'{dataset_type}\nROC Curve for {model_name}')
    ax.set_xlabel('False Positive Rate')
    ax.set_ylabel('True Positive Rate')
    plt.show()

In [None]:
def validation_analysis(y_tr, y_val, y_tr_pred, y_val_pred, model_name: str):
    print("\nTRAIN SET\n")
    
    print("Classification report:")
    print(classification_report(y_tr, y_tr_pred))
    
    plt_cm(y_tr, y_tr_pred, "Training Set", model_name)
    plt_roc_auc(y_tr, y_tr_pred, "Training Set", model_name)
    
    print("\nVALIDATION SET\n")
    
    print("Classification report:")
    print(classification_report(y_val, y_val_pred))
    
    plt_cm(y_val, y_val_pred, "Validation Set", model_name)
    plt_roc_auc(y_val, y_val_pred, "Validation Set", model_name)

# First Ensemble Layer

## Data Preparation

In [None]:
def get_df() -> pd.DataFrame:
    daigt = pd.read_csv(CSV_INPUT_PATH)
    daigt.rename(columns={'label': 'generated'}, inplace=True)
    df_ = daigt[daigt['RDizzl3_seven'] == True] # only True ones
    cols = ['text', 'generated']
    # df_['text'] = correct_spelling(df_, 'text')
    
    df_ = df_[cols]
    
    return df_

In [None]:
def get_df_with_cleaned_text() -> pd.DataFrame:
    """ Returns a dataframe with cleaned text.
        Note: Sampling by SAMPLE_RATE is applied if SAMPLE (global variable) is True. """
    df = get_df()

    if SAMPLE:
        df = df.sample(frac=SAMPLE_RATE, random_state=SEED)
        
    df['clean_text'] = clean_text(df)

    return df

In [None]:
def save_to_files(tr_pred, val_pred, val2_pred, y_tr, y_val, y_val2, ensemble: VotingClassifier,
                  vectorizer: TfidfVectorizer, series_name: str, tfidf_type: str):
    tr_pred = pd.Series(tr_pred, index=y_tr.index, name=series_name)
    val_pred = pd.Series(val_pred, index=y_val.index, name=series_name)
    val2_pred = pd.Series(val2_pred, index=y_val2.index, name=series_name)
    
    # saving artifacts
    # saving processed data
    filepath = os.path.join(OUTPUT_PATH, f'{tfidf_type}_tfidf_tr_pred.csv')
    tr_pred.to_csv(filepath)
    
    filepath = os.path.join(OUTPUT_PATH, f'{tfidf_type}_tfidf_val_pred.csv')
    val_pred.to_csv(filepath)
    
    filepath = os.path.join(OUTPUT_PATH, f'{tfidf_type}_tfidf_val2_pred.csv')
    val2_pred.to_csv(filepath)
    
    # saving objects
    filepath = os.path.join(OUTPUT_PATH, f'{tfidf_type}_tfidf_ensemble.pickle')
    pickle.dump(ensemble, open(filepath, 'wb'))
    
    filepath = os.path.join(OUTPUT_PATH, f'{tfidf_type}_tfidf_vectorizer.pickle')
    pickle.dump(vectorizer, open(filepath, 'wb'))

In [None]:
def train_val_test_split(X: pd.DataFrame, y: pd.Series) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.Series, pd.Series, pd.Series]:
    X_tr, X_temp, y_tr, y_temp = train_test_split(X, y, test_size=VALIDATION_SHARE, stratify=y, random_state=SEED)
    X_val, X_val2, y_val, y_val2 = train_test_split(X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=SEED)

    return X_tr, X_val, X_val2, y_tr, y_val, y_val2

## Training

In [None]:
ensemble_info = {
    'lgbm_clf': {'class': LGBMClassifier,
                 'params': {'random_state':
                            SEED, 'verbose': -1
                           }
                },
    'cat_clf': {'class': CatBoostClassifier,
                'params': {'random_seed': SEED,
                           'verbose': False
                          }
               },
    'lr_clf': {'class': LogisticRegression,
               'params': {'random_state': SEED,
                          'max_iter': MAX_ITER
                         }
              },
    'sgd_clf': {'class': SGDClassifier,
                'params': {'loss': 'log_loss',
                           'random_state': SEED,
                           'verbose': 1
                          }
               }
}

In [None]:
def get_ensemble(*clf_names: str) -> VotingClassifier:
    # training
    
    clf_instances = []

    for clf_name in clf_names:
        clf_class = ensemble_info[clf_name]['class']
        clf_params = ensemble_info[clf_name]['params']

        clf_instance = clf_class(**clf_params)
        clf_instances.append((clf_name, clf_instance))
    
    ensemble = VotingClassifier(estimators=clf_instances, voting='soft', n_jobs=-1)

    return ensemble

In [None]:
# for the second stage of stack ensemble
def get_preds(X_val, df_X_val2, ensemble: VotingClassifier, vectorizer: TfidfVectorizer, is_pos: bool) -> tuple[np.array, np.array]:
    # train
    val_pred = ensemble.predict_proba(X_val)[:,1]
    
    # validation
    corpus_val2 = get_corpus(df_X_val2, "pos_text" if is_pos else "clean_text")
    X_val2 = tfidf(corpus_val2, vectorizer)
    val2_pred = ensemble.predict_proba(X_val2)[:,1]
    
    return val_pred, val2_pred

## Models by Feature Sets

In [None]:
@print_run_time
def text_tfidf_models() -> tuple[np.array, np.array]:
    df = get_df_with_cleaned_text()

    # spliting the data
    X = df.drop(columns=['generated']).copy()
    y = df['generated'].copy()

    X_tr, X_val, X_val2, y_tr, y_val, y_val2 = train_val_test_split(X, y)
  
    # creating corpus and vectorizer
    corpus_tr = get_corpus(X_tr)
    tfidf_X_tr, vectorizer = tfidf(corpus_tr)

    ensemble = get_ensemble('lgbm_clf', 'cat_clf', 'lr_clf', 'sgd_clf')
    ensemble.fit(tfidf_X_tr, y_tr)

    y_tr_pred = ensemble.predict(tfidf_X_tr)

    # intial validation
    corpus_val = get_corpus(X_val)
    tfidf_X_val = tfidf(corpus_val, vectorizer)
    
    y_val_pred = ensemble.predict(tfidf_X_val)
    tfidf_tr_pred = ensemble.predict_proba(tfidf_X_tr)[:,1]

    validation_analysis(y_tr, y_val, y_tr_pred, y_val_pred, "TF-IDF Ensemble")
      
    tfidf_val_pred, tfidf_val2_pred = get_preds(tfidf_X_val, X_val2, ensemble, vectorizer, False)
    save_to_files(tfidf_tr_pred, tfidf_val_pred, tfidf_val2_pred, y_tr, y_val, y_val2, ensemble, vectorizer, "text_tfidf_pred", "text")
    
    return tfidf_val_pred, tfidf_val2_pred

In [None]:
@print_run_time
def pos_tfidf_models() -> tuple[np.array, np.array]:
    df = get_df_with_cleaned_text()

    # extracting pos-related feature
    nlp = spacy.load("en_core_web_sm")

    docs = [nlp(t) for t in df['clean_text']]
    pos_tags = [[token.pos_ for token in doc] for doc in docs]
    
    df['pos_tags'] = pos_tags
    df['pos_text'] = df['pos_tags'].apply(lambda x: ' '.join(x))
    
    # spliting the data
    X = df.drop(columns=['generated']).copy()
    y = df['generated'].copy()

    X_tr, X_val, X_val2, y_tr, y_val, y_val2 = train_val_test_split(X, y)
    
    corpus_tr = get_corpus(X_tr, "pos_text")
    pos_tfidf_X_tr, vectorizer = tfidf(corpus_tr)
    
    ensemble = get_ensemble('lgbm_clf', 'cat_clf', 'lr_clf', 'sgd_clf')
    ensemble.fit(pos_tfidf_X_tr, y_tr)
    
    y_tr_pred = ensemble.predict(pos_tfidf_X_tr)

    # intial validation  
    pos_corpus_val = get_corpus(X_val, text_col='pos_text')
    pos_tfidf_X_val = tfidf(pos_corpus_val, vectorizer)
    
    y_val_pred = ensemble.predict(pos_tfidf_X_val)
    pos_tfidf_tr_pred = ensemble.predict_proba(pos_tfidf_X_tr)[:,1]
    
    validation_analysis(y_tr, y_val, y_tr_pred, y_val_pred ,"POS TF-IDF Ensemble")
    
    pos_tfidf_val_pred, pos_tfidf_val2_pred = get_preds(pos_tfidf_X_val, X_val2, ensemble, vectorizer, True)
    save_to_files(pos_tfidf_tr_pred, pos_tfidf_val_pred, pos_tfidf_val2_pred, y_tr, y_val, y_val2, ensemble, vectorizer, "pos_tfidf_pred", "pos")
    
    return pos_tfidf_val_pred, pos_tfidf_val2_pred

In [None]:
@print_run_time
def numeric_features_models() -> tuple[np.array, np.array]:
    df = get_df_with_cleaned_text()

    # extracting numeric features
    df = feature_extraction(df, 'clean_text')
    
    # spliting the data
    cols = ['text','clean_text', 'generated']
    X = df.drop(columns=cols).copy()
    y = df['generated'].copy()
    
    X_tr, X_val, X_val2, y_tr, y_val, y_val2 = train_val_test_split(X, y)
    
    num_ensemble = get_ensemble('lgbm_clf', 'cat_clf', 'lr_clf', 'sgd_clf')

    num_ensemble.fit(X_tr, y_tr)
    
    y_tr_pred = num_ensemble.predict(X_tr)
    y_val_pred = num_ensemble.predict(X_val)
    
    # intial validation  
    validation_analysis(y_tr, y_val, y_tr_pred, y_val_pred ,"Numeric Ensemble")

    # for the second stage of stack ensemble
    # train
    num_val_pred = num_ensemble.predict_proba(X_val)[:,1]
    # validation
    num_val2_pred = num_ensemble.predict_proba(X_val2)[:,1]
    
    # transform to pd.Series 
    num_val_pred = pd.Series(num_val_pred, index=y_val.index, name='num_pred')
    num_val2_pred = pd.Series(num_val2_pred, index=y_val2.index, name='num_pred')
    
    # saving artifacts
    # saving processed data
    filepath = os.path.join(OUTPUT_PATH, 'num_val_pred.csv')
    num_val_pred.to_csv(filepath)
    
    filepath = os.path.join(OUTPUT_PATH, 'num_val2_pred.csv')
    num_val2_pred.to_csv(filepath)
    
    # saving objects
    filepath = os.path.join(OUTPUT_PATH, 'num_ensemble.pickle')
    pickle.dump(num_ensemble, open(filepath, 'wb'))
    
    return num_val_pred, num_val2_pred

## Execution and Analysis

In [None]:
tfidf_val_pred, tfidf_val2_pred = text_tfidf_models()
pos_val_pred, pos_val2_pred = pos_tfidf_models()
num_val_pred, num_val2_pred = numeric_features_models()

# Second Ensemble Layer

In [None]:
pred_val = pd.DataFrame({'tfidf_pred': tfidf_val_pred, 'pos_pred': pos_val_pred, 'num_pred': num_val_pred})
pred_val2 = pd.DataFrame({'tfidf_pred': tfidf_val2_pred, 'pos_pred': pos_val2_pred, 'num_pred': num_val2_pred})

df = get_df()

if SAMPLE:
    df = df.sample(frac=SAMPLE_RATE, random_state=SEED)

X = df.copy()
y = df['generated'].copy()

del df
gc.collect()

# splitting the data
X_tr, X_val, X_val2, y_tr, y_val, y_val2 = train_val_test_split(X, y)

del X_tr, X_val, X_val2, y_tr
gc.collect()

cat_clf_stack = CatBoostClassifier(random_state=SEED, verbose=False)

cat_clf_stack.fit(pred_val, y_val)

y_val_pred = cat_clf_stack.predict(pred_val)
y_val2_pred = cat_clf_stack.predict(pred_val2)

filepath = os.path.join(OUTPUT_PATH, 'cat_clf_stack.pickle')
pickle.dump(cat_clf_stack, open(filepath, 'wb'))

del filepath, cat_clf_stack
gc.collect()

validation_analysis(y_val, y_val2, y_val_pred, y_val2_pred ,"CatBoost Over Predictions")