In [None]:
# 0) Install dependencies
# Run this once in Colab
!pip install -q transformers datasets sentencepiece spacy sklearn
!python -m spacy download en_core_web_sm

# 1) Imports & seeds
import random
import re
import os
import math
from tqdm.auto import tqdm
import numpy as np
import pandas as pd

import spacy
nlp = spacy.load("en_core_web_sm")

import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from datasets import Dataset, load_dataset

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# set seeds for reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

print("Torch device:", torch.device("cuda" if torch.cuda.is_available() else "cpu"))

In [None]:
# 2) Load a small corpus (AG News) and build sentence list
# We'll load a small slice for speed and split each example into sentences
raw = load_dataset("ag_news", split="train[:2000]")  # 2000 articles -> many sentences
texts = []
for item in raw:
    full = item["text"]
    # simple sentence splitting via spaCy
    doc = nlp(full)
    for sent in doc.sents:
        s = sent.text.strip()
        # keep only reasonably long sentences to have meaningful structure
        if len(s) > 20 and len(s) < 300:
            texts.append(s)

print(f"Extracted {len(texts)} sentences from AG News slice.")

# quick random sample to inspect
for i in range(5):
    print("-", texts[i])

In [None]:
# 3) Heuristic detector for logical sentences
#   - causal: because, since, therefore, thus, hence, so that...
#   - conditional: if, unless, only if, provided that...
#   - contrast: although, however, despite, nevertheless, but
#   - quantifier: all, every, none, some, each
#   - inference markers: implies, suggests, indicates, ergo

LOGIC_WORDS = {
    "causal": ["because", "since", "therefore", "thus", "hence", "so that", "as a result", "consequently"],
    "conditional": ["if", "unless", "only if", "provided that", "when ... then", "in case"],
    "contrast": ["although", "however", "despite", "nevertheless", "but", "yet"],
    "quantifier": ["all", "every", "none", "some", "each", "no", "at least", "at most", "only"],
    "inference": ["implies", "suggests", "indicates", "ergo", "therefore"],
}

# Compile a flat set for quick membership checks
flat_words = set()
for k,v in LOGIC_WORDS.items():
    for phrase in v:
        flat_words.add(phrase)

# helper to detect if any of the phrases appears as a word/phrase
def contains_phrase(text, phrase):
    # naive check: phrase may contain spaces, use regex with word boundaries
    pattern = r"\b" + re.escape(phrase) + r"\b"
    return re.search(pattern, text, flags=re.IGNORECASE) is not None

def is_logical_sentence(text):
    text_low = text.lower()
    # check token-level and phrase-level presence
    for phrase in flat_words:
        if contains_phrase(text_low, phrase):
            return True
    # dependency-level heuristics: presence of mark/adverbial clause like 'if' 'because' etc
    doc = nlp(text)
    for token in doc:
        if token.dep_ in ("mark", "advcl") and token.text.lower() in {"if", "because", "although", "since"}:
            return True
    # a fallback: sentences with at least two clauses often have reasoning; count commas + 'that'
    if text.count(",") >= 1 and ("that" in text_low or "so" in text_low):
        return True
    return False

# Tag the sentences
labels = [1 if is_logical_sentence(t) else 0 for t in texts]
df = pd.DataFrame({"text": texts, "label": labels})
print(df.label.value_counts())

# Inspect some positive & negative examples to validate heuristics
print("\nSOME POSITIVE SAMPLES (label=1):")
print(df[df.label==1].sample(5, random_state=SEED)["text"].tolist())

print("\nSOME NEGATIVE SAMPLES (label=0):")
print(df[df.label==0].sample(5, random_state=SEED)["text"].tolist())


In [None]:
# 4) Build adversarial/perturbed examples
# We create transformations:
#   A) flip_connective: 'because' -> 'although' or 'therefore' -> 'but' (makes reasoning suspect)
#   B) remove_connective: drop 'because', 'therefore', 'if' -> often destroys valid logic signal
#   C) simple_negation: insert 'not' after first auxiliary verb (is/are/was/will/can), or prepend "It is not the case that "
# Note: these are heuristic and intentionally simple.

AUX_VERBS = ["is", "are", "was", "were", "has", "have", "had", "will", "can", "could", "should", "would", "do", "does", "did", "may", "might"]

def flip_connective(text):
    mapping = {
        "because": "although",
        "therefore": "but",
        "thus": "but",
        "hence": "but",
        "if": "unless",
        "although": "because",  # sometimes flips
        "since": "although",
    }
    text_low = text.lower()
    for src, tgt in mapping.items():
        if contains_phrase(text_low, src):
            # replace only first occurrence (preserve case roughly)
            pattern = re.compile(r"\b" + re.escape(src) + r"\b", flags=re.IGNORECASE)
            return pattern.sub(tgt, text, count=1)
    return text

def remove_connective(text):
    # remove common connectives
    to_remove = ["because", "therefore", "thus", "hence", "if", "although", "since", "so that", "consequently"]
    text2 = text
    for r in to_remove:
        text2 = re.sub(r"\b" + re.escape(r) + r"\b", "", text2, flags=re.IGNORECASE)
    # clean multiple spaces
    text2 = re.sub(r"\s+", " ", text2).strip()
    return text2

def simple_negate(text):
    doc = nlp(text)
    # try to insert 'not' after first auxiliary verb
    for token in doc:
        if token.text.lower() in AUX_VERBS:
            # insert 'not' after this token in the original text string
            # locate token in string by char offsets
            start = token.idx + len(token.text)
            neg_text = text[:start] + " not" + text[start:]
            return neg_text
    # fallback: prepend "It is not the case that "
    return "It is not the case that " + text

# Test transformations
examples = df[df.label==1].sample(6, random_state=SEED)["text"].tolist()
for ex in examples:
    print("\nORIG:", ex)
    print("FLIP:", flip_connective(ex))
    print("REMOVE:", remove_connective(ex))
    print("NEGATE:", simple_negate(ex))


In [None]:
# 5) Create balanced dataset for classification
# We'll create: original logical positives (label=1), random negatives (label=0)
pos_texts = df[df.label==1]["text"].tolist()
neg_texts = df[df.label==0]["text"].tolist()

# sample sizes
N_pos = min(1200, len(pos_texts))   # cap for speed
N_neg = N_pos

# sample
pos_sample = random.sample(pos_texts, N_pos) if len(pos_texts) > N_pos else pos_texts
neg_sample = random.sample(neg_texts, N_neg) if len(neg_texts) > N_neg else neg_texts[:N_neg]

print(f"\nUsing {len(pos_sample)} positive and {len(neg_sample)} negative samples for baseline.")

# Prepare baseline dataset (text, label)
baseline_texts = pos_sample + neg_sample
baseline_labels = [1]*len(pos_sample) + [0]*len(neg_sample)

# Shuffle
combined = list(zip(baseline_texts, baseline_labels))
random.shuffle(combined)
baseline_texts, baseline_labels = zip(*combined)

# Build HuggingFace Dataset
ds_baseline = Dataset.from_dict({"text": list(baseline_texts), "label": list(baseline_labels)})
ds_baseline = ds_baseline.train_test_split(test_size=0.2, seed=SEED)

print(ds_baseline)


In [None]:
# 6) Tokenizer & model (DistilBERT)
MODEL_NAME = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

def preprocess(batch):
    return tokenizer(batch["text"], truncation=True, padding="max_length", max_length=128)

tokenized_train = ds_baseline["train"].map(preprocess, batched=True)
tokenized_eval  = ds_baseline["test"].map(preprocess, batched=True)
tokenized_train = tokenized_train.remove_columns(["text"])
tokenized_eval = tokenized_eval.remove_columns(["text"])

# set format for Trainer
tokenized_train.set_format("torch")
tokenized_eval.set_format("torch")

# 7) Training function wrapper (so we can run baseline then adversarial)
def train_model(train_dataset, eval_dataset, output_dir, num_train_epochs=2):
    model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=2)
    training_args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=num_train_epochs,
        per_device_train_batch_size=16 if torch.cuda.is_available() else 8,
        per_device_eval_batch_size=32 if torch.cuda.is_available() else 16,
        eval_strategy="epoch",
        save_strategy="no",
        learning_rate=2e-5,
        weight_decay=0.01,
        logging_steps=50,
        seed=SEED,
        report_to="none",
    )
    def compute_metrics(pred):
        labels = pred.label_ids
        preds = np.argmax(pred.predictions, axis=1)
        return {
            "accuracy": float(accuracy_score(labels, preds)),
            "f1": float(f1_score(labels, preds)),
            "precision": float(precision_score(labels, preds)),
            "recall": float(recall_score(labels, preds))
        }

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics
    )
    trainer.train()
    metrics = trainer.evaluate()
    return trainer, metrics

In [None]:
# 8) Baseline training
print("\n=== TRAINING BASELINE MODEL (Logical vs Non-Logical) ===")
baseline_trainer, baseline_metrics = train_model(tokenized_train, tokenized_eval, output_dir="./baseline_model", num_train_epochs=2)
print("Baseline eval metrics:", baseline_metrics)


In [None]:
# 9) Create adversarial dataset
# We'll take positive logical sentences and create adversarial variants (flip, remove, negate)
adv_texts = []
adv_labels = []

# Use a subset to generate adversarials to avoid explosion
N_adv_gen = min(800, len(pos_sample))
pos_for_adv = random.sample(pos_sample, N_adv_gen)

for t in pos_for_adv:
    # original positive kept as positive label 1
    adv_texts.append(t); adv_labels.append(1)
    # flipped connective -> likely breaks the logical flow --> treat as negative (0)
    flipped = flip_connective(t)
    if flipped != t:
        adv_texts.append(flipped); adv_labels.append(0)
    # removed connective -> often removes the explicit inference marker -> negative
    removed = remove_connective(t)
    if removed != t and len(removed) > 15:
        adv_texts.append(removed); adv_labels.append(0)
    # negation -> negative
    neg = simple_negate(t)
    if neg != t:
        adv_texts.append(neg); adv_labels.append(0)

# For extra negatives, add random negative samples
extra_negs = random.sample(neg_texts, min(800, len(neg_texts)))
for t in extra_negs:
    adv_texts.append(t); adv_labels.append(0)

# Combine adversarial data with some original neutral data to balance
print(f"Generated {len(adv_texts)} adversarial / augmented examples.")

# Build dataset and split
ds_adv = Dataset.from_dict({"text": adv_texts, "label": adv_labels})
ds_adv = ds_adv.train_test_split(test_size=0.2, seed=SEED)

# Tokenize
tokenized_adv_train = ds_adv["train"].map(preprocess, batched=True)
tokenized_adv_eval  = ds_adv["test"].map(preprocess, batched=True)
tokenized_adv_train = tokenized_adv_train.remove_columns(["text"])
tokenized_adv_eval = tokenized_adv_eval.remove_columns(["text"])
tokenized_adv_train.set_format("torch")
tokenized_adv_eval.set_format("torch")


In [None]:
# 10) Adversarial fine-tuning:
# We will take the baseline model weights and continue training on augmented adversarial set.
print("\n=== ADVERSARIAL FINE-TUNING ===")
# save baseline model then reload to ensure training starts from baseline weights
baseline_trainer.save_model("./baseline_saved")
adv_model = AutoModelForSequenceClassification.from_pretrained("./baseline_saved", num_labels=2)

training_args_adv = TrainingArguments(
    output_dir="./adv_model",
    num_train_epochs=2,
    per_device_train_batch_size=16 if torch.cuda.is_available() else 8,
    per_device_eval_batch_size=32 if torch.cuda.is_available() else 16,
    eval_strategy="epoch",
    save_strategy="no",
    learning_rate=2e-5,
    weight_decay=0.01,
    logging_steps=50,
    seed=SEED,
    report_to="none",
)

def compute_metrics_adv(pred):
    labels = pred.label_ids
    preds = np.argmax(pred.predictions, axis=1)
    return {
        "accuracy": float(accuracy_score(labels, preds)),
        "f1": float(f1_score(labels, preds)),
        "precision": float(precision_score(labels, preds)),
        "recall": float(recall_score(labels, preds))
    }

trainer_adv = Trainer(
    model=adv_model,
    args=training_args_adv,
    train_dataset=tokenized_adv_train,
    eval_dataset=tokenized_adv_eval,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics_adv
)

trainer_adv.train()
adv_metrics = trainer_adv.evaluate()
print("Adversarial-finetune eval metrics:", adv_metrics)


In [None]:

# 11) Evaluate both models on a held-out test set (we'll use baseline's test for consistent comparison)
# Get predictions from baseline model on baseline test
def get_preds(trainer, tokenized_dataset):
    preds_output = trainer.predict(tokenized_dataset)
    preds = np.argmax(preds_output.predictions, axis=1)
    labels = preds_output.label_ids
    return preds, labels, preds_output.metrics

baseline_preds, baseline_labels, baseline_eval_metrics = get_preds(baseline_trainer, tokenized_eval)
adv_preds, adv_labels, adv_eval_metrics = get_preds(trainer_adv, tokenized_eval)  # evaluate adv model on the same baseline eval set

print("\nEVALUATION ON BASELINE EVAL SET (same held-out split):")
print("Baseline model metrics (reported earlier):", baseline_eval_metrics)
print("Adv-finetuned model evaluated on same set (to test generalization):", adv_eval_metrics)

# Compute comparative metrics (accuracy/f1) directly for the eval set
print("\nDirect comparison on baseline eval set:")
print("Baseline accuracy:", accuracy_score(baseline_labels, baseline_preds), "F1:", f1_score(baseline_labels, baseline_preds))
print("Adv model accuracy:", accuracy_score(baseline_labels, adv_preds), "F1:", f1_score(baseline_labels, adv_preds))


In [None]:
# 12) Qualitative inspection: show sample predictions side-by-side
def show_samples(texts, true_labels, base_preds, adv_preds, n=12):
    df = pd.DataFrame({
        "text": texts,
        "true": true_labels,
        "base_pred": base_preds,
        "adv_pred": adv_preds
    })
    display(df.sample(n))

# Reconstruct texts from tokenized_eval (we removed the 'text' column earlier)
# We still have original baseline split in ds_baseline["test"] -> use that
test_texts = ds_baseline["test"]["text"]
show_samples(test_texts, baseline_labels, baseline_preds, adv_preds, n=10)

In [None]:
# 13) Save final adversarial model for submission
trainer_adv.save_model("./final_adv_model")
print("Saved final adversarial model to ./final_adv_model")


In [None]:
# 14) Quick conclusions to print
print("\nCONCLUSIONS / NEXT STEPS:")
print("- Baseline vs adversarial-fine-tuned metrics shown above.")
print("- Qualitatively inspect misclassifications to refine adversarial generation rules.")
print("- To emulate LogiGAN more, you could build masked-generation tasks (harder) or use multi-step chain examples.")