In [6]:
# Data handling
import pandas as pd
import numpy as np
import re
import string

# NLP libraries
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet, cmudict, stopwords
from collections import Counter
#nltk.download("punkt")
#nltk.download("averaged_perceptron_tagger")
#nltk.download("wordnet")
#nltk.download("stopwords")
#nltk.download("cmudict")

# Topic modelling
from gensim import corpora
from gensim.models import LsiModel
from gensim.models.coherencemodel import CoherenceModel

# Readability & sentiment analysis
import textstat
from textblob import TextBlob

# ML
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score, confusion_matrix

# Functions

In [7]:
# Data Loading

def load_data(filepath):
    """Loads data from a CSV file."""
    data = pd.read_csv(filepath, encoding="utf-8")
    df = data.copy()
    df["source"] = df["source"].apply(lambda x: "AI" if x != "Human" else x)
    return df

In [10]:
# Text Cleaning

class TextCleaner(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.lemmatizer = WordNetLemmatizer()
        self.tag_dict = {"J": wordnet.ADJ, "N": wordnet.NOUN, "V": wordnet.VERB, "R": wordnet.ADV}
        self.symbols_to_keep = {"$", "-", "%"}

    def get_wordnet_pos(slef,text):
        """Convert NLTK POS tag to a format suitable for WordNet Lemmatizer."""
        tag_dict = {"J": wordnet.ADJ, "N": wordnet.NOUN, "V": wordnet.VERB, "R": wordnet.ADV}
        tag = nltk.pos_tag([text])[0][1][0].upper() if text else "N" # Default to NOUN
        return tag_dict.get(tag, wordnet.NOUN)

    def text_cleaning(self,text):
        """Clean text but preserve some symbols and numbers."""
        if not isinstance(text, str):  # Handle non-string input
            return ""

        text = text.lower().strip() # Turn to all lowercase & remove whitespace
        text = re.sub(r'[^\x00-\x7F]+', '', text) # Remove non-ASCII characters
        text_clean = "".join(char if char not in string.punctuation or char in symbols_to_keep else "" for char in text)

        tokenized = word_tokenize(text_clean)
        tagged_words = nltk.pos_tag(tokenized)
        lemmatizer = WordNetLemmatizer()
        lemmatized = [self.lemmatizer.lemmatize(word, self.get_wordnet_pos(word)) for word, _ in tagged_words]
        cleaned_text = " ".join(lemmatized)
        return cleaned_text

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        """Apply text cleaning transformation."""
        return X.apply(self.text_cleaning)

In [11]:
# Feature Extraction

class FeatureExtractor(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.cmu_dict = cmudict.dict()
        self.lemmatizer = WordNetLemmatizer()
        self.stop_words = set(stopwords.words("english"))
        self.common_ai_words = set([
            "commendable", "transhumanist", "meticulous", "elevate", "hello", "tapestry", "leverage",
            "journey", "headache", "resonate", "testament", "explore", "binary", "delve",
            "enrich", "seamless", "multifaceted", "sorry", "foster", "convey", "beacon",
            "interplay", "oh", "navigate", "form", "adhere", "cannot", "landscape", "remember",
            "paramount", "comprehensive", "placeholder", "grammar", "real", "summary", "symphony",
            "furthermore", "relationship", "ultimately", "profound", "art", "supercharge", "evolve",
            "beyond", "reimagine", "vibrant", "robust", "pivotal", "certainly", "quinoa", "orchestrate", "align",
            "diverse", "recommend", "annals", "note", "employ", "bustling", "indeed", "digital", "enigma", "outfit",
            "indelible", "refrain", "culture", "treat", "emerge", "esteemed", "weight", "whimsical", "bespoke",
            "highlight", "antagonist", "unlock", "key", "breakdown", "tailor", "misinformation", "treasure",
            "paradigm", "captivate", "song", "underscore", "calculate", "especially", "climate", "hedging",
            "inclusive", "exercise", "ai", "embrace", "level", "nuance", "career", "dynamic", "accent",
            "ethos", "cheap", "firstly", "online", "goodbye"])
        self.scaler = StandardScaler()


    def get_word_stress(self, word):
        """Calculate stress score for a single word using CMU dictionary."""
        if word in cmu_dict:
            return sum(int(char) for syllable in cmu_dict[word][0] for char in syllable if char.isdigit())
        return 0


    def get_sentence_stress(self, sentence):
        """Calculate total stress score for a sentence."""
        if not isinstance(sentence, str) or not sentence.strip():
            return 0
        words = sentence.split()
        stress_values = [get_word_stress(word) for word in words]
        return sum(stress_values)


    def cons_density(self, text):
        """Calculate consonant density in text."""
        if not isinstance(text, str) or not text.strip():
            return 0.0
        consonant = sum(1 for char in text if char.isalpha() and char not in "aeiouAEIOU")
        vowel = sum(1 for char in text if char.isalpha() and char in "aeiouAEIOU")
        return round((consonant/(vowel + consonant)),3)


    def redundance(self, text):
        """Compute redundancy score based on repeated words."""
        if not isinstance(text, str) or not text.strip():
            return 0

        tokens = word_tokenize(text)
        clean_tokens = [w for w in tokens if w not in self.stop_words]

        final_lemmas = [self.lemmatizer.lemmatize(self.lemmatizer.lemmatize(word, 'v'), 'n') for word in clean_tokens]

        word_counts = Counter(final_lemmas)
        if len(word_counts) == 0:
            return 0  # Prevent division by zero if no tokens remain

        mean_freq = sum(word_counts.values()) / len(word_counts)
        score = sum(1 for word, count in word_counts.items() if count > 3 * mean_freq)

        return score


    def sentiment_polarity(self, text):
        """Compute sentiment polarity (0 to 1), with smoothing for better balance."""
        if not isinstance(text, str) or not text.strip():
            return 0.0  # Handle empty or None inputs safely

        sent_pol = TextBlob(text).sentiment.polarity
        abs_pol = abs(round(sent_pol, 3))
        return 0.0 if abs_pol < 0.1 else min(abs_pol, 0.8)


    def word_choice(self, text):
        """Count number of AI-associated words in the text."""
        return sum(1 for word in text.split() if word in self.common_ai_words)


    def coherence(self, text):
        """Compute coherence score using LSA model."""
        if not isinstance(text, str) or not text.strip():
            return 0.0

        tokens = word_tokenize(text)
        if not tokens:
            coherence_score = 0
        else:
            dictionary = corpora.Dictionary([tokens])
            corpus_gensim = [dictionary.doc2bow(tokens)]
            lsa_model = LsiModel(corpus_gensim, id2word=dictionary, num_topics=5)

            coherence_model = CoherenceModel(
                model=lsa_model,
                texts=[tokens],
                dictionary=dictionary,
                coherence='c_v')

            coherence_score = coherence_model.get_coherence()
        return coherence_score


    def reading_ease(text):
        """Returns Flesch Reading Ease score (higher = easier to read)."""
        if not isinstance(text, str) or not text.strip():
            return 0.0  # Handle empty or None input safely
        return textstat.flesch_reading_ease(text)


    def gunning_fog(text):
        """Returns Gunning Fog Index (higher = more difficult to read)."""
        if not isinstance(text, str) or not text.strip():
            return 0.0  # Handle empty or None input safely
        return textstat.gunning_fog(text)


    def extract_features(self, text):
        """Process a batch of text inputs."""
        return np.array([
            [self.get_word_stress(text),
             self.get_sentence_stress(text),
             self.cons_density(text),
             self.redundance(text),
             self.sentiment_polarity(text),
             self.word_choice(text),
             self.coherence(text),
             self.reading_ease(text),
             self.gunning_fog(text)]
            for t in text])


    def fit(self, X, y=None):
        """Fit the scaler using training data."""
        features = self.extract_features(X)
        self.scaler.fit(features)
        return self


    def transform(self, X):
        """Extract and scale features."""
        features = self.extract_features(X)
        return self.scaler.transform(features)

In [23]:
# TF-IDF Vectorization

class TFIDFVectorizer(BaseEstimator, TransformerMixin):
    def __init__(self, max_features=10000, ngram_range=(1, 2), stop_words="english"):
        """
        Custom TF-IDF vectorizer with optimizations.
        - max_features: Limits vocabulary size to the most important words.
        - ngram_range: (1,1) for unigrams, (1,2) for bigrams, etc.
        """
        self.vectorizer = TfidfVectorizer(
            max_features=max_features,
            ngram_range=ngram_range,
            stop_words=stop_words)

    def fit(self, X, y=None):
        if not isinstance(X, (list, pd.Series)) or len(X) == 0:
            raise ValueError("Input must be a non-empty list or pandas Series of text.")
        self.vectorizer.fit(X)
        return self

    def transform(self, X):
        """Transforms text data into TF-IDF feature vectors."""
        if not isinstance(X, (list, pd.Series)) or len(X) == 0:
            raise ValueError("Input must be a non-empty list or pandas Series of text.")
        return self.vectorizer.transform(X)

# Initialize the TF-IDF vectorizer
tfidf_transformer = TFIDFVectorizer(max_features=5000, ngram_range=(1, 2), stop_words="english")

In [None]:
# Model Training & Evaluation

# Data Splitting
def split_data(X, y, test_size=0.2, random_state=42):
    """Splits data into training and test sets."""
    return train_test_split(X, y, test_size=test_size, random_state=random_state)


# Training
class GradientBoostingClassifierWrapper(BaseEstimator, TransformerMixin):
    def __init__(self, n_estimators=200, learning_rate=0.1, max_depth=3, early_stopping_rounds=10):
        """
        Custom wrapper for Gradient Boosting Classifier.
        - n_estimators: Number of boosting stages.
        - learning_rate: Shrinks contribution of each tree.
        - max_depth: Maximum depth of individual estimators.
        - early_stopping_rounds: Stops training when validation loss doesn't improve.
        """
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.early_stopping_rounds = early_stopping_rounds
        self.model = None


    def fit(self, X, y):
        """Train Gradient Boosting model."""
        if X is None or y is None or len(X) == 0 or len(y) == 0:
            raise ValueError("Training data (X, y) must not be empty.")

        self.model = GradientBoostingClassifier(
            n_estimators = self.n_estimators,
            learning_rate = self.learning_rate,
            max_depth = self.max_depth,
            es = self.early_stopping_rounds)

        self.model.fit(X, y)
        return self


    def predict(self, X):
        """Predict with the trained Gradient Boosting model."""
        if self.model is None:
            raise ValueError("Model has not been trained. Call `fit()` first.")
        return self.model.predict(X)


    def predict_proba(self, X):
        """Predict class probabilities."""
        if self.model is None:
            raise ValueError("Model has not been trained. Call `fit()` first.")
        return self.model.predict_proba(X)


# Model evaluation
class EvaluationPipeline:
    def __init__(self, model):
        """Initialize with a trained model."""
        self.model = model

    def fit(self, X, y=None):
        """Fit method (not used, but required for Scikit-learn compatibility)."""
        return self

    def evaluate(self, X_test, y_test):
        """Evaluate the trained model on test data."""
        y_pred = self.model.predict(X_test)
        y_pred_proba = self.model.predict_proba(X_test)[:, 1]

        results = {
            "loss": log_loss(y_test, y_pred_proba),
            "accuracy": accuracy_score(y_test, y_pred),
            "precision": precision_score(y_test, y_pred),
            "recall": recall_score(y_test, y_pred),
            "roc_auc": roc_auc_score(y_test, y_pred_proba),
            "confusion_matrix": confusion_matrix(y_test, y_pred)
        }

        print("\n📊 **Model Performance:**")
        print(f"❌ Loss: {results['loss']:.4f}")
        print(f"✅ Accuracy: {results['accuracy']:.4f}")
        print(f"Precision: {results['precision']:.4f}")
        print(f"Recall: {results['recall']:.4f}")
        print(f"ROC-AUC: {results['roc_auc']:.4f}")
        print(f"Confusion Matrix:\n{results['confusion_matrix']}\n")

        return results


# End-to-End Model Training & Evaluation
def train_and_evaluate(X, y, n_estimators=200, learning_rate=0.1, max_depth=3, early_stopping_rounds=10):
    """
    Splits data, trains the model, and evaluates performance.
    Allows customization of Gradient Boosting hyperparameters.
    """
    X_train, X_test, y_train, y_test = split_data(X, y)

    # Train the model
    model_params = {
        "n_estimators": n_estimators,
        "learning_rate": learning_rate,
        "max_depth": max_depth,
        "early_stopping_rounds": early_stopping_rounds
    }
    model = GradientBoostingClassifierWrapper(**model_params)
    model.fit(X_train, y_train)

    # Evaluate the model
    evaluator = EvaluationPipeline(model=model)
    results = evaluator.evaluate(X_test, y_test)

    return model, results

# Pipelines

In [None]:
text_cleaning_pipeline = Pipeline([('clean_text', TextCleaner())])

feature_extraction_pipeline = Pipeline([("feature_extraction", FeatureExtractor())])

tfidf_pipeline = Pipeline([("tfidf", TFIDFVectorizer(max_features=10000))])

# Combine TF-IDF with the Scaled Feature Extraction
combined_features = FeatureUnion([
    ("tfidf", tfidf_pipeline),
    ("scaled_features", feature_extraction_pipeline)])

final_pipeline = Pipeline([
    ("text_cleaning", text_cleaning_pipeline),
    ("combined_features", combined_features),
    ("classifier", GradientBoostingClassifierWrapper(n_estimators=200, learning_rate=0.1, max_depth=3))])

In [None]:
# Example Usage

# Train the model
final_pipeline.fit(X_train, y_train)

# Predict on test data
predictions = final_pipeline.predict(X_test)