Addressing this task by employing the Normalized Compression Distance (NCD) method, utilizing gzip compression to measure text similarity

Reference:
- "Low-Resource" Text Classification: A Parameter-Free Classification Method with Compressors paper (https://aclanthology.org/2023.findings-acl.426/)
- https://github.com/rasbt/nn_plus_gzip/blob/main/1_2_nn_plus_gzip_fix-tie-breaking.ipynb

In [1]:
import numpy as np
import pandas as pd

In [2]:
train = pd.read_csv("/kaggle/input/daigt-v2-train-dataset/train_v2_drcat_02.csv")
external_train = pd.read_csv("/kaggle/input/llm-detect-ai-generated-text/train_essays.csv")
external_train.rename(columns={'generated': 'label'}, inplace=True)

In [3]:
# !pip install -q language-tool-python --no-index --find-links ../input/daigt-misc/
# !mkdir -p /root/.cache/language_tool_python/
# !cp -r /kaggle/input/daigt-misc/lang57/LanguageTool-5.7 /root/.cache/language_tool_python/LanguageTool-5.7

In [4]:
#https://www.kaggle.com/code/siddhvr/llm-detect-ai-gt-sub
not_persuade_df = train[train['source'] != 'persuade_corpus']
persuade_df = train[train['source'] == 'persuade_corpus']
sampled_persuade_df = persuade_df.sample(n=6000, random_state=42)

all_human = set(list(''.join(sampled_persuade_df.text.to_list())))
other = set(list(''.join(not_persuade_df.text.to_list())))
chars_to_remove = ''.join([x for x in other if x not in all_human])
print(chars_to_remove)

translation_table = str.maketrans('', '', chars_to_remove)
def remove_chars(s):
    return s.translate(translation_table)

🏕👨🐶💀□ü🚂📹☀ち🐢👮🥶🔭ê🏽🛬禁🐭🥖こ😍使🙀🏃📸💥�🌱🎭🏈🚭🏫🥳💃♂🏢🌠’🍞🐦🎤❄🏟🎣は🌮🏜🙌是🚔🤔😤🥕🤓取🎵护🍖🏙🏰🥑😊💁💇😎☹🧭🛫上🥦🎸🍎🏳时🔋🌎éí👂🍜😔💊😉💼🥁🐱🍔💭°🧠📰🛋😲🧬🤯驶…🌳完🔬🌧🌭🌨😡🌷中🍝ん😻🔧📝🐴📷驾📉─🚨🎧😕🍣😩🌊😋😵🙃所🎾🚀🤜🕺✨🤛🍿🇧🏯🎶🐰🍳¬🤢💸🥤法🎄🙄う😨💦🤟⏰🌄🚚🏔📊р😱🤪​🏛😒安ç🌸🧚🌲🧦🌞😅🏼将都👥🛣💚🛑合📄🙅🤖🧀╯🇪🛠ã用🇵🛍👍💰🎮–с🦸🇺🧽必保🐕👇о道力^💅📣🏏”📞🥛📺り🐻🔍🐆的🐠🤤👏🍽📦。🕒🤫🙋路🐧👻🇷せ👬🌏🗳💯司🔮🍰🕹🎅🚌🚴🐾🛸—和📧🍲🎊该有🏨🤞📱🤷🦐👫🙏🧐д🚫🤣🚪🚣😴🌟🥔👦👌一😓♀“唯🎈🧖🔑须👧🏠👋🍋🥟🍟🎉🚕🌈🦎🤘💧选🌽🦁⚽🧙🎨😖あ😜手🍓📖💨😳🎩😝🇸🏡ā🍁🌯😘🥭🇯


In [5]:
train=pd.concat([train,external_train])
train['text'] = train['text'].apply(remove_chars)
train['text'] = train['text'].str.replace('\n', '')

test = pd.read_csv('/kaggle/input/llm-detect-ai-generated-text/test_essays.csv')
test['text'] = test['text'].str.replace('\n', '')
test['text'] = test['text'].apply(remove_chars)
#correct_df(test)
#df = pd.concat([train['text'], test['text']], axis=0)

In [6]:
train.tail()

Unnamed: 0,text,label,prompt_name,source,RDizzl3_seven,id,prompt_id
1373,There has been a fuss about the Elector Colleg...,0,,,,fe6ff9a5,1.0
1374,Limiting car usage has many advantages. Such a...,0,,,,ff669174,0.0
1375,There's a new trend that has been developing f...,0,,,,ffa247e0,0.0
1376,As we all know cars are a big part of our soci...,0,,,,ffc237e9,0.0
1377,Cars have been around since the 1800's and hav...,0,,,,ffe1ca0d,0.0


In [7]:
import gzip
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import roc_auc_score
from collections import Counter
import time

def compress_text(text):
    return len(gzip.compress(text.encode()))

# Pre-compute the compressed lengths for training data
train['compressed_length'] = train['text'].apply(compress_text)

def calculate_ncd(c_text1, c_text2, combined_text):
    c_combined = compress_text(combined_text)
    return (c_combined - min(c_text1, c_text2)) / max(c_text1, c_text2)

def classify_text(test_text, training_data, k=2):
    c_test_text = compress_text(test_text)
    distances = []

    for _, row in training_data.iterrows():
        c_train_text = row['compressed_length']
        combined_text = test_text + " " + row['text']
        ncd = calculate_ncd(c_test_text, c_train_text, combined_text)
        distances.append(ncd)

    sorted_idx = np.argsort(np.array(distances))
    top_k_class = np.array(training_data['label'])[sorted_idx[:k]]
    predicted_class = Counter(top_k_class).most_common(1)[0][0]
    return predicted_class

from sklearn.model_selection import train_test_split

def batch_classify_texts(test_texts, training_data, k=2, sample_size=None):
    if sample_size is not None and sample_size < len(training_data):
        # Perform stratified sampling
        _, stratified_sample = train_test_split(
            training_data, 
            test_size=sample_size, 
            stratify=training_data['label'], 
            random_state=42
        )
        training_data = stratified_sample

    probabilities = []
    for test_text in test_texts:
        c_test_text = compress_text(test_text)
        distances = []

        for _, row in training_data.iterrows():
            c_train_text = row['compressed_length']
            combined_text = test_text + " " + row['text']
            ncd = calculate_ncd(c_test_text, c_train_text, combined_text)
            distances.append(ncd)

        sorted_idx = np.argsort(np.array(distances))
        top_k_class = np.array(training_data['label'])[sorted_idx[:k]]
        predicted_class = Counter(top_k_class).most_common(1)[0][0]
        probabilities.append(predicted_class)
    return probabilities


def estimate_time_left(start_time, current_iter, total_iter):
    elapsed_time = time.time() - start_time
    avg_time_per_iter = elapsed_time / current_iter
    remaining_iters = total_iter - current_iter
    estimated_time_left = avg_time_per_iter * remaining_iters
    return estimated_time_left


In [8]:
use_k_fold = False
debug_mode = True
debug_size = 1000
batch_size = 100

if debug_mode:
    debug_train = train.sample(n=debug_size, random_state=42)
else:
    debug_train = train

if use_k_fold:
    n_splits = 5
    kf = KFold(n_splits=n_splits)
    auroc_scores = []

    for fold, (train_index, val_index) in enumerate(kf.split(debug_train)):
        start_time = time.time()
        train_fold, val_fold = debug_train.iloc[train_index], debug_train.iloc[val_index]
        val_texts = [text for text in val_fold['text']]
        total_batches = len(val_texts) // batch_size + (len(val_texts) % batch_size != 0)

        probabilities = []
        for i in tqdm(range(total_batches), desc=f"Processing fold {fold + 1}"):
            batch_texts = val_texts[i*batch_size:(i+1)*batch_size]
            batch_probabilities = batch_classify_texts(batch_texts, train_fold, k=2)
            probabilities.extend(batch_probabilities)

            time_left = estimate_time_left(start_time, i + 1, total_batches)
            print(f"Fold {fold + 1}, Batch {i + 1}/{total_batches}, Time left (est.): {time_left:.2f} seconds")

        fold_auroc_score = roc_auc_score(val_fold['label'].values, probabilities)
        auroc_scores.append(fold_auroc_score)

    average_auroc_score = sum(auroc_scores) / len(auroc_scores)
    print("Average AUROC Score:", average_auroc_score)
else:
    start_time = time.time()
    train_single, val_single = train_test_split(debug_train, test_size=0.2, random_state=42)
    val_texts = [text for text in val_single['text']]
    total_batches = len(val_texts) // batch_size + (len(val_texts) % batch_size != 0)

    probabilities = []
    for i in tqdm(range(total_batches), desc="Processing single evaluation"):
        batch_texts = val_texts[i*batch_size:(i+1)*batch_size]
        batch_probabilities = batch_classify_texts(batch_texts, train_single, k=2)
        probabilities.extend(batch_probabilities)

        time_left = estimate_time_left(start_time, i + 1, total_batches)
        print(f"Batch {i + 1}/{total_batches}, Time left (est.): {time_left:.2f} seconds")

    single_auroc_score = roc_auc_score(val_single['label'].values, probabilities)
    print("Single Evaluation AUROC Score:", single_auroc_score)

Processing single evaluation:  50%|█████     | 1/2 [00:25<00:25, 25.93s/it]

Batch 1/2, Time left (est.): 25.94 seconds


Processing single evaluation: 100%|██████████| 2/2 [00:52<00:00, 26.16s/it]

Batch 2/2, Time left (est.): 0.00 seconds
Single Evaluation AUROC Score: 0.9366697559468645





In [9]:
import tqdm

# Generating predictions for the test set
batch_size = 100
sample_train_size = 150  # Adjust as needed (larger = slower)

test_probabilities = []
for i in tqdm.tqdm(range(0, len(test), batch_size), desc="Generating test predictions"):
    batch_texts = test['text'][i:i+batch_size].tolist()
    batch_probabilities = batch_classify_texts(batch_texts, train, k=2, sample_size=sample_train_size)
    test_probabilities.extend(batch_probabilities)

Generating test predictions: 100%|██████████| 1/1 [00:00<00:00,  6.82it/s]


In [10]:
# Creating the submission file
test['generated'] = test_probabilities
submission = pd.DataFrame({
    'id': test['id'],
    'generated': test['generated']
})
submission.to_csv('submission.csv', index=False)