In [None]:
import os
import sys
import math
import json
import random
import glob
import time
import itertools
from dataclasses import dataclass
from typing import List, Tuple, Dict, Any, Optional

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

# TensorFlow / Keras
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Sklearn metrics
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support,
    roc_auc_score, average_precision_score, confusion_matrix
)

# Colab helpers (optional)
try:
    from google.colab import drive, files
    IN_COLAB = True
except Exception:
    IN_COLAB = False

SEED = 42
np.random.seed(SEED)
random.seed(SEED)
tf.random.set_seed(SEED)

print(f"TensorFlow: {tf.__version__}")
print(f"Running in Colab: {IN_COLAB}")


KeyboardInterrupt: 

In [None]:
# Path setup: try to use local `archive/` folder. If not present (e.g., in Colab),
# allow uploading a ZIP with the same structure, or mount Drive.

import zipfile

DATA_DIR_DEFAULT = "archive"
DATA_DIR = DATA_DIR_DEFAULT

if IN_COLAB and not os.path.isdir(DATA_DIR_DEFAULT):
    print("`archive/` not found. Options:\n- Upload a ZIP of the archive folder\n- Or mount Google Drive and set DATA_DIR accordingly.")
    from google.colab import files
    uploaded = files.upload()
    # If user uploads a zip (e.g., archive.zip), unzip it here
    for name in uploaded:
        if name.lower().endswith('.zip'):
            with zipfile.ZipFile(name, 'r') as zip_ref:
                zip_ref.extractall('./')
            break
    if not os.path.isdir(DATA_DIR_DEFAULT):
        print("Please set DATA_DIR manually to the extracted folder.")

print("DATA_DIR:", os.path.abspath(DATA_DIR))


In [None]:
# Data loading utilities

@dataclass
class ClauseRecord:
    text: str
    category: str  # derived from filename
    label: Optional[str] = None  # if present in CSV

class ClauseDatasetLoader:
    def __init__(self, data_dir: str, text_column_candidates: Optional[List[str]] = None,
                 label_column_candidates: Optional[List[str]] = None):
        self.data_dir = data_dir
        self.text_column_candidates = text_column_candidates or [
            'text', 'clause', 'clause_text', 'content', 'body'
        ]
        self.label_column_candidates = label_column_candidates or [
            'label', 'type', 'clause_type', 'category'
        ]

    def _infer_text_column(self, df: pd.DataFrame) -> str:
        for col in self.text_column_candidates:
            if col in df.columns:
                return col
        # fallback: choose the longest-text column
        lengths = {col: df[col].astype(str).str.len().mean() for col in df.columns}
        return max(lengths, key=lengths.get)

    def _infer_label_column(self, df: pd.DataFrame) -> Optional[str]:
        for col in self.label_column_candidates:
            if col in df.columns:
                return col
        return None

    def load(self) -> List[ClauseRecord]:
        pattern = os.path.join(self.data_dir, '*.csv')
        files = sorted(glob.glob(pattern))
        records: List[ClauseRecord] = []
        for path in files:
            category = os.path.splitext(os.path.basename(path))[0]
            try:
                df = pd.read_csv(path)
            except Exception:
                try:
                    df = pd.read_csv(path, encoding='latin-1')
                except Exception as e:
                    print(f"Skipping {path}: {e}")
                    continue
            if df.empty:
                continue
            text_col = self._infer_text_column(df)
            label_col = self._infer_label_column(df)
            for _, row in df.iterrows():
                text = str(row.get(text_col, '')).strip()
                if not text:
                    continue
                label = str(row.get(label_col)) if (label_col is not None and label_col in df.columns) else None
                records.append(ClauseRecord(text=text, category=category, label=label))
        return records

loader = ClauseDatasetLoader(DATA_DIR)
clauses = loader.load()
print(f"Loaded clauses: {len(clauses)} across CSVs in {DATA_DIR}")

# Show a small sample
for rec in random.sample(clauses, min(3, len(clauses))):
    print(f"[{rec.category}] {rec.text[:120]}{'...' if len(rec.text)>120 else ''}")


In [None]:
# Pair generation: create positive pairs (same category) and negative pairs (different categories)

@dataclass
class PairRecord:
    text_a: str
    text_b: str
    label: int  # 1 for similar, 0 for not similar
    cat_a: str
    cat_b: str

class PairBuilder:
    def __init__(self, clauses: List[ClauseRecord], max_pos_pairs_per_cat: int = 5000,
                 max_neg_pairs: int = 200000):
        self.clauses = clauses
        self.max_pos_pairs_per_cat = max_pos_pairs_per_cat
        self.max_neg_pairs = max_neg_pairs

    def build(self) -> List[PairRecord]:
        by_cat: Dict[str, List[ClauseRecord]] = {}
        for rec in self.clauses:
            by_cat.setdefault(rec.category, []).append(rec)
        categories = list(by_cat.keys())

        pairs: List[PairRecord] = []
        # Positive pairs
        for cat, recs in by_cat.items():
            if len(recs) < 2:
                continue
            indices = list(range(len(recs)))
            all_pos = list(itertools.combinations(indices, 2))
            random.shuffle(all_pos)
            limit = min(len(all_pos), self.max_pos_pairs_per_cat)
            for i, j in all_pos[:limit]:
                a = recs[i].text
                b = recs[j].text
                pairs.append(PairRecord(a, b, 1, cat, cat))

        # Negative pairs sampled across categories
        cat_pairs = list(itertools.combinations(categories, 2))
        random.shuffle(cat_pairs)
        neg_pairs_added = 0
        for (c1, c2) in cat_pairs:
            recs1 = by_cat[c1]
            recs2 = by_cat[c2]
            if not recs1 or not recs2:
                continue
            # sample min size to limit combinations
            sample1 = random.sample(recs1, min(len(recs1), 200))
            sample2 = random.sample(recs2, min(len(recs2), 200))
            combos = list(itertools.product(sample1, sample2))
            random.shuffle(combos)
            for r1, r2 in combos:
                pairs.append(PairRecord(r1.text, r2.text, 0, r1.category, r2.category))
                neg_pairs_added += 1
                if neg_pairs_added >= self.max_neg_pairs:
                    break
            if neg_pairs_added >= self.max_neg_pairs:
                break

        random.shuffle(pairs)
        return pairs

pair_builder = PairBuilder(clauses)
pairs = pair_builder.build()
print(f"Total pairs: {len(pairs)} (pos/neg split approx unknown until counting)")
print(pd.Series([p.label for p in pairs]).value_counts())

# Train/Val/Test split on pairs, stratified by label
labels = np.array([p.label for p in pairs])
train_idx, temp_idx = train_test_split(np.arange(len(pairs)), test_size=0.3, random_state=SEED, stratify=labels)
labels_temp = labels[temp_idx]
val_idx, test_idx = train_test_split(temp_idx, test_size=0.5, random_state=SEED, stratify=labels_temp)

print(f"Split sizes: train={len(train_idx)}, val={len(val_idx)}, test={len(test_idx)}")


In [None]:
# Preprocessing and vectorization

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

class TextVectorizer:
    def __init__(self, num_words: int = 50000, oov_token: str = "<OOV>", max_len: int = 256):
        self.num_words = num_words
        self.oov_token = oov_token
        self.max_len = max_len
        self.tokenizer: Optional[Tokenizer] = None

    def fit(self, texts: List[str]):
        self.tokenizer = Tokenizer(num_words=self.num_words, oov_token=self.oov_token)
        self.tokenizer.fit_on_texts(texts)

    def transform(self, texts: List[str]) -> np.ndarray:
        assert self.tokenizer is not None
        seqs = self.tokenizer.texts_to_sequences(texts)
        return pad_sequences(seqs, maxlen=self.max_len, padding='post', truncating='post')

    def fit_transform(self, texts: List[str]) -> np.ndarray:
        self.fit(texts)
        return self.transform(texts)

# Build raw text arrays for train/val/test
train_pairs = [pairs[i] for i in train_idx]
val_pairs = [pairs[i] for i in val_idx]
test_pairs = [pairs[i] for i in test_idx]

train_texts = [p.text_a for p in train_pairs] + [p.text_b for p in train_pairs]
val_texts = [p.text_a for p in val_pairs] + [p.text_b for p in val_pairs]
test_texts = [p.text_a for p in test_pairs] + [p.text_b for p in test_pairs]

vectorizer = TextVectorizer(num_words=50000, max_len=256)
vectorizer.fit(train_texts)

# Vectorize pairs
X_train_a = vectorizer.transform([p.text_a for p in train_pairs])
X_train_b = vectorizer.transform([p.text_b for p in train_pairs])
y_train = np.array([p.label for p in train_pairs], dtype=np.int32)

X_val_a = vectorizer.transform([p.text_a for p in val_pairs])
X_val_b = vectorizer.transform([p.text_b for p in val_pairs])
y_val = np.array([p.label for p in val_pairs], dtype=np.int32)

X_test_a = vectorizer.transform([p.text_a for p in test_pairs])
X_test_b = vectorizer.transform([p.text_b for p in test_pairs])
y_test = np.array([p.label for p in test_pairs], dtype=np.int32)

vocab_size = min(vectorizer.num_words, 1 + len(vectorizer.tokenizer.word_index) if vectorizer.tokenizer else vectorizer.num_words)
max_len = vectorizer.max_len
print(f"Vocab size: {vocab_size}, Max len: {max_len}")


In [None]:
# Model factory: Siamese encoders and classifiers

class AdditiveAttention(layers.Layer):
    def __init__(self, units: int, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.W = layers.Dense(units, activation='tanh')
        self.v = layers.Dense(1)

    def call(self, inputs, mask=None):
        # inputs: (batch, seq_len, hidden)
        score = self.v(self.W(inputs))  # (batch, seq_len, 1)
        weights = tf.nn.softmax(score, axis=1)  # (batch, seq_len, 1)
        context = tf.reduce_sum(weights * inputs, axis=1)  # (batch, hidden)
        return context

    def get_config(self):
        cfg = super().get_config()
        cfg.update({"units": self.units})
        return cfg

class SiameseEncoderFactory:
    @staticmethod
    def create_bilstm_attention(vocab_size: int, embed_dim: int = 128, lstm_units: int = 128,
                                max_len: int = 256, dropout_rate: float = 0.2) -> keras.Model:
        inp = layers.Input(shape=(max_len,), name='input_ids')
        x = layers.Embedding(vocab_size, embed_dim, mask_zero=True, name='embedding')(inp)
        x = layers.Bidirectional(layers.LSTM(lstm_units, return_sequences=True))(x)
        x = layers.Dropout(dropout_rate)(x)
        x = AdditiveAttention(units=lstm_units)(x)
        x = layers.LayerNormalization()(x)
        x = layers.Dense(128, activation='relu')(x)
        x = layers.Dropout(dropout_rate)(x)
        out = layers.Dense(128, activation=None, name='sentence_embedding')(x)
        return keras.Model(inp, out, name='BiLSTM_Attn_Encoder')

    @staticmethod
    def create_cnn(vocab_size: int, embed_dim: int = 128, num_filters: int = 128,
                   kernel_sizes: Tuple[int, ...] = (3, 4, 5), max_len: int = 256,
                   dropout_rate: float = 0.2) -> keras.Model:
        inp = layers.Input(shape=(max_len,), name='input_ids')
        x = layers.Embedding(vocab_size, embed_dim, mask_zero=False, name='embedding')(inp)
        conv_outputs = []
        for k in kernel_sizes:
            c = layers.Conv1D(num_filters, k, activation='relu', padding='valid')(x)
            p = layers.GlobalMaxPooling1D()(c)
            conv_outputs.append(p)
        h = layers.Concatenate()(conv_outputs) if len(conv_outputs) > 1 else conv_outputs[0]
        h = layers.Dropout(dropout_rate)(h)
        h = layers.Dense(256, activation='relu')(h)
        h = layers.Dropout(dropout_rate)(h)
        out = layers.Dense(128, activation=None, name='sentence_embedding')(h)
        return keras.Model(inp, out, name='CNN_Encoder')

class SiameseClassifier:
    @staticmethod
    def build_from_encoder(encoder: keras.Model, max_len: int = 256, dropout_rate: float = 0.2) -> keras.Model:
        a = layers.Input(shape=(max_len,), name='input_a')
        b = layers.Input(shape=(max_len,), name='input_b')
        ea = encoder(a)
        eb = encoder(b)
        # Combine with absolute difference and elementwise product
        diff = tf.math.abs(ea - eb)
        prod = ea * eb
        merged = layers.Concatenate()([ea, eb, diff, prod])
        x = layers.LayerNormalization()(merged)
        x = layers.Dense(256, activation='relu')(x)
        x = layers.Dropout(dropout_rate)(x)
        x = layers.Dense(128, activation='relu')(x)
        x = layers.Dropout(dropout_rate)(x)
        out = layers.Dense(1, activation='sigmoid')(x)
        model = keras.Model(inputs=[a, b], outputs=out)
        return model

# Instantiate models
encoder_bilstm = SiameseEncoderFactory.create_bilstm_attention(vocab_size=vocab_size, max_len=max_len)
model_bilstm = SiameseClassifier.build_from_encoder(encoder_bilstm, max_len=max_len)

encoder_cnn = SiameseEncoderFactory.create_cnn(vocab_size=vocab_size, max_len=max_len)
model_cnn = SiameseClassifier.build_from_encoder(encoder_cnn, max_len=max_len)

model_bilstm.summary()
model_cnn.summary()


In [None]:
# Training setup

BATCH_SIZE = 256
EPOCHS = 10
LR = 1e-3

callbacks = [
    keras.callbacks.EarlyStopping(monitor='val_auc', mode='max', patience=3, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2)
]

metrics = [
    keras.metrics.AUC(curve='ROC', name='auc'),
    keras.metrics.AUC(curve='PR', name='auc_pr'),
    keras.metrics.Precision(name='precision'),
    keras.metrics.Recall(name='recall')
]

model_bilstm.compile(optimizer=keras.optimizers.Adam(LR), loss='binary_crossentropy', metrics=metrics)
model_cnn.compile(optimizer=keras.optimizers.Adam(LR), loss='binary_crossentropy', metrics=metrics)

history_bilstm = model_bilstm.fit(
    [X_train_a, X_train_b], y_train,
    validation_data=([X_val_a, X_val_b], y_val),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=callbacks,
    verbose=1
)

history_cnn = model_cnn.fit(
    [X_train_a, X_train_b], y_train,
    validation_data=([X_val_a, X_val_b], y_val),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=callbacks,
    verbose=1
)


In [None]:
# Evaluation utilities

def evaluate_model(model: keras.Model, name: str,
                   Xa: np.ndarray, Xb: np.ndarray, y_true: np.ndarray) -> Dict[str, Any]:
    probs = model.predict([Xa, Xb], batch_size=1024, verbose=0).reshape(-1)
    preds = (probs >= 0.5).astype(int)
    acc = accuracy_score(y_true, preds)
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, preds, average='binary', zero_division=0)
    try:
        roc = roc_auc_score(y_true, probs)
    except Exception:
        roc = float('nan')
    try:
        pr_auc = average_precision_score(y_true, probs)
    except Exception:
        pr_auc = float('nan')
    cm = confusion_matrix(y_true, preds)
    print(f"[{name}] Acc={acc:.4f} P={precision:.4f} R={recall:.4f} F1={f1:.4f} ROC-AUC={roc:.4f} PR-AUC={pr_auc:.4f}")
    return {
        'name': name,
        'accuracy': acc,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'roc_auc': roc,
        'pr_auc': pr_auc,
        'confusion_matrix': cm.tolist(),
        'probs': probs.tolist(),
        'preds': preds.tolist()
    }

results = []
results.append(evaluate_model(model_bilstm, 'BiLSTM+Attention', X_test_a, X_test_b, y_test))
results.append(evaluate_model(model_cnn, 'CNN', X_test_a, X_test_b, y_test))

with open('results.json', 'w') as f:
    json.dump(results, f, indent=2)

pd.DataFrame([{k:v for k,v in r.items() if k not in ('confusion_matrix','probs','preds')} for r in results])


In [None]:
# Plots: training curves and confusion matrices

def plot_history(history: keras.callbacks.History, title: str, save_path: Optional[str] = None):
    hist = history.history
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))
    axes[0].plot(hist['loss'], label='train')
    axes[0].plot(hist['val_loss'], label='val')
    axes[0].set_title(f'{title} - Loss')
    axes[0].legend()

    if 'auc' in hist and 'val_auc' in hist:
        axes[1].plot(hist['auc'], label='train')
        axes[1].plot(hist['val_auc'], label='val')
        axes[1].set_title(f'{title} - ROC AUC')
        axes[1].legend()
    else:
        axes[1].axis('off')

    if 'auc_pr' in hist and 'val_auc_pr' in hist:
        axes[2].plot(hist['auc_pr'], label='train')
        axes[2].plot(hist['val_auc_pr'], label='val')
        axes[2].set_title(f'{title} - PR AUC')
        axes[2].legend()
    else:
        axes[2].axis('off')

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, dpi=150)
    plt.show()

plot_history(history_bilstm, 'BiLSTM+Attention', save_path='history_bilstm.png')
plot_history(history_cnn, 'CNN', save_path='history_cnn.png')

# Confusion matrices
for r in results:
    cm = np.array(r['confusion_matrix'])
    fig = plt.figure(figsize=(5,4))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False)
    plt.title(f"Confusion Matrix - {r['name']}")
    plt.xlabel('Predicted')
    plt.ylabel('True')
    out = f"cm_{r['name'].replace('+','_').replace(' ','_')}.png"
    plt.tight_layout()
    plt.savefig(out, dpi=150)
    plt.show()


In [None]:
# Qualitative examples: show some correct and incorrect predictions

def sample_qualitative(pairs: List[PairRecord], probs: np.ndarray, preds: np.ndarray, y_true: np.ndarray,
                       k: int = 5) -> Dict[str, List[Dict[str, Any]]]:
    idxs = np.arange(len(pairs))
    correct = idxs[preds == y_true]
    incorrect = idxs[preds != y_true]
    random.shuffle(correct)
    random.shuffle(incorrect)
    out = {
        'correct': [],
        'incorrect': []
    }
    for s, group in [(correct, 'correct'), (incorrect, 'incorrect')]:
        for i in s[:k]:
            p = pairs[i]
            out[group].append({
                'text_a': p.text_a,
                'text_b': p.text_b,
                'true_label': int(y_true[i]),
                'pred_prob': float(probs[i]),
                'pred_label': int(preds[i]),
                'cat_a': p.cat_a,
                'cat_b': p.cat_b
            })
    return out

# Build predictions for both models
probs_bilstm = np.array(results[0]['probs'])
preds_bilstm = np.array(results[0]['preds'])
probs_cnn = np.array(results[1]['probs'])
preds_cnn = np.array(results[1]['preds'])

qual_bilstm = sample_qualitative(test_pairs, probs_bilstm, preds_bilstm, y_test, k=6)
qual_cnn = sample_qualitative(test_pairs, probs_cnn, preds_cnn, y_test, k=6)

with open('qualitative_bilstm.json', 'w') as f:
    json.dump(qual_bilstm, f, indent=2)
with open('qualitative_cnn.json', 'w') as f:
    json.dump(qual_cnn, f, indent=2)

print('Examples (BiLSTM+Attention):')
for ex in qual_bilstm['correct'][:3]:
    print('\n[CORRECT]')
    print('A:', ex['text_a'][:200])
    print('B:', ex['text_b'][:200])
    print('True:', ex['true_label'], 'Pred:', ex['pred_label'], 'Prob:', f"{ex['pred_prob']:.3f}")
for ex in qual_bilstm['incorrect'][:3]:
    print('\n[INCORRECT]')
    print('A:', ex['text_a'][:200])
    print('B:', ex['text_b'][:200])
    print('True:', ex['true_label'], 'Pred:', ex['pred_label'], 'Prob:', f"{ex['pred_prob']:.3f}")

print('\nExamples (CNN):')
for ex in qual_cnn['correct'][:3]:
    print('\n[CORRECT]')
    print('A:', ex['text_a'][:200])
    print('B:', ex['text_b'][:200])
    print('True:', ex['true_label'], 'Pred:', ex['pred_label'], 'Prob:', f"{ex['pred_prob']:.3f}")
for ex in qual_cnn['incorrect'][:3]:
    print('\n[INCORRECT]')
    print('A:', ex['text_a'][:200])
    print('B:', ex['text_b'][:200])
    print('True:', ex['true_label'], 'Pred:', ex['pred_label'], 'Prob:', f"{ex['pred_prob']:.3f}")


In [None]:
# Export artifacts for report (metrics table, figures, model timings)

# Simple timing comparison captured from history objects (approximate per-epoch time)
def estimate_epoch_times(history: keras.callbacks.History) -> float:
    times = history.epoch
    # Not directly available; as a proxy we compute total duration / epochs
    return None

# Save a compact CSV of key metrics
summary_rows = []
for r in results:
    summary_rows.append({
        'Model': r['name'],
        'Accuracy': r['accuracy'],
        'Precision': r['precision'],
        'Recall': r['recall'],
        'F1': r['f1'],
        'ROC_AUC': r['roc_auc'],
        'PR_AUC': r['pr_auc']
    })
metrics_df = pd.DataFrame(summary_rows)
metrics_df.to_csv('metrics_summary.csv', index=False)
metrics_df
