# Legal Clause Similarity — Baseline experiments

This notebook implements two baseline models (1) BiLSTM siamese encoder and (2) BiLSTM + Attention siamese encoder to predict semantic similarity between legal clauses from the provided `archive/` CSV files.

Assumptions: positive pairs are sampled from clauses in the same CSV (same category); negative pairs are sampled across different categories. No pre-trained transformers are used (per task constraint).


## How to run

Run the following (PowerShell) to install minimal dependencies and open the notebook:

```powershell
python -m pip install -r requirements.txt
jupyter notebook
```


In [None]:
# Basic imports
import os
import glob
import random
import math
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, precision_recall_fscore_support

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device:', device)


In [None]:
# 1) Load CSV files from archive into a single DataFrame
root = Path('archive')
csv_files = sorted(root.glob('*.csv'))
rows = []
for f in csv_files:
    try:
        df = pd.read_csv(f)
    except Exception as e:
        # skip unreadable files
        continue
    # heuristics: look for columns that contain clause text - common names: 'text', 'clause', 'clause_text'
    text_col = None
    for candidate in ['text','clause','clause_text','clauseText','clause_text_1','0']:
        if candidate in df.columns:
            text_col = candidate
            break
    if text_col is None:
        # fallback: take first string-like column
        for c in df.columns:
            if df[c].dtype == object:
                text_col = c
                break
    if text_col is None:
        continue
    for _, r in df.iterrows():
        text = str(r[text_col]) if not pd.isnull(r[text_col]) else ''
        if len(text.strip()) == 0:
            continue
        rows.append({'text': text.strip(), 'category': f.stem})

clauses_df = pd.DataFrame(rows)
print('Loaded clauses:', len(clauses_df))
clauses_df.head()


In [None]:
# 2) Build pairs: positive = same category, negative = different category
# We'll sample to create a balanced dataset of pairs.
def build_pairs(df, max_pos_per_cat=500, max_neg=20000, seed=42):
    random.seed(seed)
    categories = df['category'].unique().tolist()
    cat_to_texts = {c: df[df['category']==c]['text'].tolist() for c in categories}
    pairs = []
    # positive pairs
    for c, texts in cat_to_texts.items():
        n = len(texts)
        if n < 2:
            continue
        samples = texts if max_pos_per_cat is None else random.sample(texts, min(len(texts), max_pos_per_cat))
        # create random positive pairs from samples
        for i in range(len(samples)):
            for j in range(i+1, len(samples)):
                pairs.append((samples[i], samples[j], 1))
    # negative pairs: sample pairs across categories
    all_texts = df['text'].tolist()
    neg = set()
    attempts = 0
    while len(neg) < max_neg and attempts < max_neg*10:
        a = random.choice(all_texts)
        b = random.choice(all_texts)
        attempts += 1
        # ensure different categories
        if a == b:
            continue
        ca = df[df['text']==a]['category'].iloc[0]
        cb = df[df['text']==b]['category'].iloc[0]
        if ca == cb:
            continue
        neg.add((a,b))
    for a,b in neg:
        pairs.append((a,b,0))
    random.shuffle(pairs)
    return pairs

pairs = build_pairs(clauses_df, max_pos_per_cat=50, max_neg=5000)
print('Total pairs:', len(pairs))


In [None]:
# 3) Simple tokenizer/vocab builder (whitespace + basic cleanup)
from collections import Counter
import re

def simple_tokenize(text):
    text = text.lower()
    text = re.sub(r'[^a-z0-9\s]', ' ', text)
    toks = text.split()
    return toks

# build vocab
all_text = [t for p in pairs for t in (p[0], p[1])]
counter = Counter()
for t in all_text:
    counter.update(simple_tokenize(t))

# keep top-k words
vocab_size = 20000
most_common = counter.most_common(vocab_size-2)
itos = ['<pad>','<unk>'] + [w for w,_ in most_common]
stoi = {w:i for i,w in enumerate(itos)}

def encode(text, max_len=128):
    toks = simple_tokenize(text)[:max_len]
    ids = [stoi.get(t, 1) for t in toks]
    if len(ids) < max_len:
        ids = ids + [0]*(max_len-len(ids))
    return ids

# quick sanity
print('Vocab size:', len(itos))
print('Example encode:', encode(all_text[0])[:16])


In [None]:
# 4) PyTorch Dataset for pairs
class ClausePairsDataset(Dataset):
    def __init__(self, pairs, max_len=128):
        self.pairs = pairs
        self.max_len = max_len
    def __len__(self):
        return len(self.pairs)
    def __getitem__(self, idx):
        a,b,l = self.pairs[idx]
        a_ids = torch.tensor(encode(a, self.max_len), dtype=torch.long)
        b_ids = torch.tensor(encode(b, self.max_len), dtype=torch.long)
        return a_ids, b_ids, torch.tensor(l, dtype=torch.float)

# split pairs into train/val/test
train_pairs, test_pairs = train_test_split(pairs, test_size=0.2, random_state=42)
train_pairs, val_pairs = train_test_split(train_pairs, test_size=0.1, random_state=42)

train_ds = ClausePairsDataset(train_pairs)
val_ds = ClausePairsDataset(val_pairs)
test_ds = ClausePairsDataset(test_pairs)

batch_size = 64
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size)
test_loader = DataLoader(test_ds, batch_size=batch_size)

print(len(train_ds), len(val_ds), len(test_ds))


In [None]:
# 5) Models: Siamese BiLSTM encoder and BiLSTM+Attention encoder
class BiLSTMEncoder(nn.Module):
    def __init__(self, vocab_size, emb_dim=128, hid_dim=128, n_layers=1, dropout=0.1):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, emb_dim, padding_idx=0)
        self.lstm = nn.LSTM(emb_dim, hid_dim, num_layers=n_layers, batch_first=True, bidirectional=True, dropout=dropout if n_layers>1 else 0.0)
        self.pool = nn.AdaptiveAvgPool1d(1)
    def forward(self, x):
        # x: (B, L)
        e = self.emb(x)
        out, _ = self.lstm(e) # (B, L, 2*hid)
        # average pool over sequence length
        out_t = out.transpose(1,2)
        pooled = self.pool(out_t).squeeze(-1)
        return pooled

class AttentionEncoder(nn.Module):
    def __init__(self, vocab_size, emb_dim=128, hid_dim=128, n_layers=1, dropout=0.1):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, emb_dim, padding_idx=0)
        self.lstm = nn.LSTM(emb_dim, hid_dim, num_layers=n_layers, batch_first=True, bidirectional=True, dropout=dropout if n_layers>1 else 0.0)
        self.attn = nn.Linear(hid_dim*2, 1)
    def forward(self, x):
        e = self.emb(x)
        out, _ = self.lstm(e)
        # out: (B, L, 2*hid)
        scores = self.attn(out).squeeze(-1)
        weights = torch.softmax(scores, dim=1).unsqueeze(-1)
        pooled = (out * weights).sum(dim=1)
        return pooled

class SiameseSimilarity(nn.Module):
    def __init__(self, encoder, emb_dim):
        super().__init__()
        self.encoder = encoder
        # classifier on absolute difference and elementwise product
        self.fc = nn.Sequential(
            nn.Linear(emb_dim*2, 128),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(128,1)
        )
    def forward(self, a, b):
        ea = self.encoder(a)
        eb = self.encoder(b)
        feat = torch.cat([torch.abs(ea-eb), ea*eb], dim=1)
        out = self.fc(feat).squeeze(-1)
        return out, ea, eb


In [None]:
# 6) Training & evaluation utilities
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    losses = []
    for a,b,l in loader:
        a = a.to(device); b = b.to(device); l = l.to(device)
        optimizer.zero_grad()
        logits,_,_ = model(a,b)
        loss = criterion(logits, l)
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
    return np.mean(losses)

@torch.no_grad()
def evaluate(model, loader):
    model.eval()
    ys = []
    ps = []
    for a,b,l in loader:
        a = a.to(device); b = b.to(device)
        logits,_,_ = model(a,b)
        probs = torch.sigmoid(logits).cpu().numpy()
        ys.extend(l.numpy().tolist())
        ps.extend(probs.tolist())
    ys = np.array(ys)
    ps = np.array(ps)
    preds = (ps >= 0.5).astype(int)
    acc = accuracy_score(ys, preds)
    f1 = f1_score(ys, preds)
    try:
        auc = roc_auc_score(ys, ps)
    except Exception:
        auc = float('nan')
    return {'accuracy': acc, 'f1': f1, 'roc_auc': auc}


In [None]:
# 7) Small experiments: train both baselines for a few epochs
vocab_n = len(itos)
emb_dim = 128
hid_dim = 128
epochs = 3

# Baseline A: BiLSTM
encoder_a = BiLSTMEncoder(vocab_n, emb_dim=emb_dim, hid_dim=hid_dim).to(device)
model_a = SiameseSimilarity(encoder_a, emb_dim=hid_dim*2).to(device)
opt_a = torch.optim.Adam(model_a.parameters(), lr=1e-3)
crit = nn.BCEWithLogitsLoss()

for ep in range(epochs):
    loss = train_one_epoch(model_a, train_loader, opt_a, crit)
    val_metrics = evaluate(model_a, val_loader)
    print(f'BiLSTM Ep {ep+1}/{epochs} loss={loss:.4f} val={val_metrics}')

# Baseline B: BiLSTM + Attention
encoder_b = AttentionEncoder(vocab_n, emb_dim=emb_dim, hid_dim=hid_dim).to(device)
model_b = SiameseSimilarity(encoder_b, emb_dim=hid_dim*2).to(device)
opt_b = torch.optim.Adam(model_b.parameters(), lr=1e-3)
for ep in range(epochs):
    loss = train_one_epoch(model_b, train_loader, opt_b, crit)
    val_metrics = evaluate(model_b, val_loader)
    print(f'Attn Ep {ep+1}/{epochs} loss={loss:.4f} val={val_metrics}')

# Final evaluation on test set
print('Final BiLSTM test:', evaluate(model_a, test_loader))
print('Final Attn test:', evaluate(model_b, test_loader))


## Notes and Next Steps

- This notebook implements two lightweight baselines suitable for quick experiments.
- Limitations: random negative sampling (not hard negatives), small vocab and tokenization; no pre-trained semantic embeddings used (by constraint).
- Improvements: add class-balanced sampling, curriculum/hard-negative mining, better tokenization (subword), and more epochs/hyperparameter search.


In [None]:
# Additional evaluation metrics and plotting (Precision, Recall, PR-AUC)
from sklearn.metrics import precision_score, recall_score, average_precision_score, roc_curve, precision_recall_curve, auc, roc_auc_score
import matplotlib.pyplot as plt
import os

def evaluate_full(model, loader):
    model.eval()
    ys = []
    ps = []
    with torch.no_grad():
        for a,b,l in loader:
            a = a.to(device); b = b.to(device)
            logits,_,_ = model(a,b)
            probs = torch.sigmoid(logits).cpu().numpy()
            ys.extend(l.numpy().tolist())
            ps.extend(probs.tolist())
    ys = np.array(ys); ps = np.array(ps)
    preds = (ps >= 0.5).astype(int)
    acc = accuracy_score(ys, preds)
    prec = precision_score(ys, preds, zero_division=0)
    rec = recall_score(ys, preds, zero_division=0)
    f1m = f1_score(ys, preds, zero_division=0)
    try:
        rocauc = roc_auc_score(ys, ps)
    except Exception:
        rocauc = float('nan')
    try:
        pra = average_precision_score(ys, ps)
    except Exception:
        pra = float('nan')
    metrics = {'accuracy': acc, 'precision': prec, 'recall': rec, 'f1': f1m, 'roc_auc': rocauc, 'pr_auc': pra}
    return metrics, ys, ps


def plot_roc_pr(ys, ps, title='Model'):
    # ROC
    try:
        fpr, tpr, _ = roc_curve(ys, ps)
        roc_auc = auc(fpr, tpr)
    except Exception:
        fpr, tpr, roc_auc = None, None, float('nan')
    # PR
    try:
        prec, rec, _ = precision_recall_curve(ys, ps)
        pr_auc = auc(rec, prec)
    except Exception:
        prec, rec, pr_auc = None, None, float('nan')

    plt.figure(figsize=(12,5))
    plt.subplot(1,2,1)
    if fpr is not None:
        plt.plot(fpr, tpr, label=f'ROC AUC={roc_auc:.3f}')
    plt.plot([0,1],[0,1],'--',color='gray')
    plt.xlabel('FPR'); plt.ylabel('TPR'); plt.title(f'ROC - {title}'); plt.legend()

    plt.subplot(1,2,2)
    if rec is not None:
        plt.plot(rec, prec, label=f'PR AUC={pr_auc:.3f}')
    plt.xlabel('Recall'); plt.ylabel('Precision'); plt.title(f'PR - {title}'); plt.legend()

    plt.tight_layout()
    display(plt.show())


In [None]:
# Training wrapper that logs per-epoch metrics and saves results
from pathlib import Path

def train_and_log(model, train_loader, val_loader, test_loader, epochs=5, lr=1e-3, name='model'):
    model.to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    crit = nn.BCEWithLogitsLoss()
    rows = []
    for ep in range(1, epochs+1):
        train_loss = train_one_epoch(model, train_loader, opt, crit)
        val_metrics, _, _ = evaluate_full(model, val_loader)
        print(f"{name} Ep{ep}/{epochs} loss={train_loss:.4f} val={val_metrics}")
        row = {'epoch': ep, 'train_loss': train_loss}
        for k,v in val_metrics.items():
            row[f'val_{k}'] = v
        rows.append(row)
    # final test
    test_metrics, ys, ps = evaluate_full(model, test_loader)
    print(f"{name} Test: {test_metrics}")
    df = pd.DataFrame(rows)
    out_dir = Path('outputs')
    out_dir.mkdir(exist_ok=True)
    df.to_csv(out_dir / f'{name}_metrics_per_epoch.csv', index=False)
    np.save(out_dir / f'{name}_test_ys.npy', ys)
    np.save(out_dir / f'{name}_test_ps.npy', ps)
    # plot ROC/PR
    plot_roc_pr(ys, ps, title=name)
    return df, test_metrics

# Run training for both models and save logs (set epochs reasonably; adjust as needed)
epochs_run = 5
print('Starting training for BiLSTM...')
df_a, test_a = train_and_log(model_a, train_loader, val_loader, test_loader, epochs=epochs_run, lr=1e-3, name='bilstm')
print('\nStarting training for Attention encoder...')
df_b, test_b = train_and_log(model_b, train_loader, val_loader, test_loader, epochs=epochs_run, lr=1e-3, name='attn')

# show a summary table
summary = pd.DataFrame([{'model':'bilstm', **test_a}, {'model':'attn', **test_b}])
summary


## Evaluation metrics definitions (categorical classification)

- Accuracy: Measures how often the model correctly classifies clause pairs as “similar” or “different.” Use only if dataset is roughly balanced.  
- Precision: Out of all predicted similar clause pairs, how many truly convey the same legal meaning? Important when false positives are costly.  
- Recall: Out of all truly similar clauses, how many did the model identify? Important when missing a truly similar clause is costly.  
- F1-Score: Harmonic mean of Precision and Recall — balances both. Standard metric for NLP classification tasks.  
- ROC-AUC / PR-AUC: Evaluate the classifier’s ranking ability across thresholds. PR-AUC (average precision) is especially informative when the positive class is rare.

The cells above compute these metrics, save per-epoch validation logs to `outputs/<model>_metrics_per_epoch.csv`, and save test set predictions to `outputs/<model>_test_ys.npy` and `outputs/<model>_test_ps.npy` for further analysis.
