# 2025 COMP90042 Project
*Make sure you change the file name with your group id.*

# Readme
*If there is something to be noted for the marker, please mention here.*

*If you are planning to implement a program with Object Oriented Programming style, please put those the bottom of this ipynb file*

# 1.DataSet Processing
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)

In [None]:
# Install necessary libraries
!pip install -q sentence-transformers faiss-cpu

In [None]:
# Preprocessing
import json, re, random
from collections import Counter
import numpy as np
from pathlib import Path

# Model implementation
from sentence_transformers import SentenceTransformer
import faiss
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

random.seed(42)

In [None]:
TRAIN_PATH = Path('/content/train-claims.json')
DEV_PATH = Path('/content/dev-claims.json')
EVIDENCE_PATH = Path('/content/evidence.json')

def load_json(path):
    """
    Load a JSON file from a Path and return the parsed object.
    """
    with open(path, 'r', encoding='utf-8') as f:
        return json.load(f)

def summarize_claims(claim_dict):
    """
    Summarize the claims in a given dictionary.
    """
    labels = [info["claim_label"] for info in claim_dict.values()]
    evid_counts = [len(info["evidences"]) for info in claim_dict.values()]
    print(f"→ #claims: {len(labels)}")
    print("→ class dist:", Counter(labels))
    print(f"→ avg. evidences/claim: {np.mean(evid_counts):.2f}")

# Load raw data
train_claims = load_json(TRAIN_PATH)
dev_claims   = load_json(DEV_PATH)
evidence     = load_json(EVIDENCE_PATH)

print("TRAIN set:")
summarize_claims(train_claims)
print("\nDEV set:")
summarize_claims(dev_claims)

# Evidence corpus stats
lengths = [len(txt.split()) for txt in evidence.values()]
print(f"\n#evidence passages: {len(lengths)}")
print(f"passage length (tokens) — avg: {np.mean(lengths):.1f}, max: {np.max(lengths)}, min: {np.min(lengths)}")

TRAIN set:
→ #claims: 1228
→ class dist: Counter({'SUPPORTS': 519, 'NOT_ENOUGH_INFO': 386, 'REFUTES': 199, 'DISPUTED': 124})
→ avg. evidences/claim: 3.36

DEV set:
→ #claims: 154
→ class dist: Counter({'SUPPORTS': 68, 'NOT_ENOUGH_INFO': 41, 'REFUTES': 27, 'DISPUTED': 18})
→ avg. evidences/claim: 3.19

#evidence passages: 1208827
passage length (tokens) — avg: 19.7, max: 479, min: 1


In [None]:
OUTPUT_TRAIN   = Path('/content/train_esim_pairs.json')
OUTPUT_DEV     = Path('/content/dev_esim_pairs.json')
LABELS         = {'SUPPORTS','REFUTES','NOT_ENOUGH_INFO','DISPUTED'}

MIN_EVID_LEN   = 10       # drop very short evidence passages
MAX_TOKENS     = 450      # truncate long claim+evidence pairs to this token count
NEG_RATE       = 1        # number of negative samples per positive sample


def clean_text(s):
    """
    Perform basic text normalization:
      - Trim whitespace
      - Collapse multiple spaces to one
      - Normalize quotes
    """
    s = s.strip()
    s = re.sub(r'\s+', ' ', s)
    s = re.sub(r'[“”]', '"', s)
    s = re.sub(r"[‘’]", "'", s)
    return s

def truncate(s, max_toks=MAX_TOKENS):
    """
    Truncate a text string to a maximum number of tokens.
    """
    toks = s.split()
    return ' '.join(toks[:max_toks]) if len(toks) > max_toks else s

def prepare(claims):
    """
    Convert raw claim entries into structured, cleaned format.
    For each claim:
      - Clean the claim text
      - Filter and clean associated evidence texts
    """
    pdata = {}
    for cid, info in claims.items():
        claim = clean_text(info['claim_text'])
        # Keep only evidence IDs present in the dict and above length threshold
        gold = [eid for eid in info['evidences'] 
                if eid in evidence and len(evidence[eid].strip()) >= MIN_EVID_LEN]
        texts = [clean_text(evidence[e]) for e in gold]
        pdata[cid] = {'claim': claim, 'gold_ids': gold, 'gold_texts': texts, 'label': info['claim_label']}
    return pdata

def gen_pairs(pdata, all_ids):
    """
    Generate positive and negative (claim, evidence) pairs for ESIM.
    - Positive: each claim + each gold evidence, labeled with the claim's true label.
    - Negative: randomly sample non-gold evidence and label NOT_ENOUGH_INFO.
    """
    pairs = []
    for cid, d in pdata.items():
        clm = d['claim']
        glt = d['gold_texts']
        lbl = d['label']
        # Positive examples
        for ev in glt:
            text = truncate(f"{clm} {ev}")
            pairs.append({'claim_id': cid, 'claim': clm, 'evidence': text, 'label': lbl})
        # Negative examples
        neg_ids = random.sample(list(all_ids - set(d['gold_ids'])), k=max(1, len(glt)*NEG_RATE))
        for nid in neg_ids:
            ev = clean_text(evidence[nid])
            text = truncate(f"{clm} {ev}")
            pairs.append({'claim_id': cid, 'claim': clm, 'evidence': text, 'label': 'NOT_ENOUGH_INFO'})
    return pairs

# Prepare structured claim entries
train_p = prepare(train_claims)
dev_p   = prepare(dev_claims)
all_eids = set(evidence.keys())

# Generate ESIM pairs
train_pairs = gen_pairs(train_p, all_eids)
dev_pairs   = gen_pairs(dev_p, all_eids)

# Save to JSON
OUTPUT_TRAIN.write_text(json.dumps(train_pairs, ensure_ascii=False, indent=2), encoding='utf-8')
OUTPUT_DEV.write_text(json.dumps(dev_pairs, ensure_ascii=False, indent=2), encoding='utf-8')

print(f"Successfully prepared {len(train_pairs)} train pairs and {len(dev_pairs)} dev pairs.")

Successfully prepared 8244 train pairs and 982 dev pairs.


# 2. Model Implementation
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)

## Baseline Model

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

# 1. Fit TF-IDF on evidence
ev_texts = list(evidence.values())
ev_ids   = list(evidence.keys())
ev_vec   = TfidfVectorizer(max_features=50000).fit(ev_texts)
ev_tfidf = ev_vec.transform(ev_texts)

# 2. For each train claim, retrieve top-1 evidence
train_docs, train_labels = [], []
for cid, info in train_claims.items():
    ctf = ev_vec.transform([info["claim_text"]])
    sims = cosine_similarity(ctf, ev_tfidf)[0]
    top1 = np.argmax(sims)
    doc = info["claim_text"] + " " + ev_texts[top1]
    train_docs.append(doc)
    train_labels.append(info["claim_label"])

# 3. Vectorize claim+evidence concatenations
clf_vec = TfidfVectorizer(max_features=50000)
X_train = clf_vec.fit_transform(train_docs)

# 4. Train classifier
clf = LogisticRegression(max_iter=1000)
clf.fit(X_train, train_labels)

# 5. Evaluate on dev set similarly


## Information Retrieval

In [None]:
evidence_ids   = list(evidence.keys())
evidence_texts = [evidence[eid] for eid in evidence_ids]

# Initialize SBERT Encoder
encoder = SentenceTransformer('all-MiniLM-L6-v2')

# Compute embeddings for all passages
corpus_embeddings = encoder.encode(
    evidence_texts,
    batch_size=64,
    show_progress_bar=True,
    normalize_embeddings=True # normalized for cosine similarity
)

# Build FAISS Index
dim   = corpus_embeddings.shape[1] # embedding dimension
index = faiss.IndexFlatIP(dim)     # inner-product index acts as cosine-similarity
index.add(corpus_embeddings)       # add all embeddings to index
print(f"Indexed {index.ntotal} evidence passages (dim={dim}).")


def retrieve_evidence(claim_text, top_k=5):
    """
    Retrieve top_k semantically similar evidence passages for a claim.
    """
    # Encode the claim (normalized)
    q_emb = encoder.encode([claim_text], normalize_embeddings=True)
    # Search the FAISS index
    scores, idxs = index.search(q_emb, top_k)
    results = []
    for score, idx in zip(scores[0], idxs[0]):
        results.append({
            'evidence_id': evidence_ids[idx],
            'text': evidence_texts[idx],
            'score': float(score)
        })
    return results

## Classification

In [None]:
EMB_DIM = 300    # embedding size
HID_SIZE = 128    # LSTM hidden size
BATCH_SIZE = 32
NUM_EPOCHS = 5
LEARNING_RATE = 1e-3
LABEL2IDX = {
    'SUPPORTS': 0,
    'REFUTES': 1,
    'NOT_ENOUGH_INFO': 2,
    'DISPUTED': 3
}


def build_vocab(pairs, min_freq=2):
    """
    Build a word-to-index mapping from tokenized text in pairs.
    Returns:
        dict: word -> index (with PAD=0, UNK=1).
    """
    counter = Counter()
    for item in pairs:
        counter.update(item['claim'].split())
        counter.update(item['evidence'].split())
    # Reserve 0 for PAD, 1 for UNK
    vocab = {'<PAD>': 0, '<UNK>': 1}
    for word, freq in counter.items():
        if freq >= min_freq:
            vocab[word] = len(vocab)
    return vocab

class ESIMDataset(Dataset):
    """
    PyTorch Dataset for ESIM pairs.
    Expects pre-tokenized pairs loaded from JSON.
    """
    def __init__(self, json_path, vocab):
        self.data = json.load(open(json_path, 'r', encoding='utf-8'))
        self.vocab = vocab

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        # Token to index, use UNK for OOV
        def toks2idx(text):
            return [self.vocab.get(w, 1) for w in text.split()]

        claim_ids = toks2idx(item['claim'])
        ev_ids = toks2idx(item['evidence'])
        label = LABEL2IDX[item['label']]
        return torch.tensor(claim_ids), torch.tensor(ev_ids), torch.tensor(label)

def pad_collate(batch):
    """
    Pad variable-length sequences in a batch.
    Returns padded tensors and lengths.
    """
    claims, evs, labels = zip(*batch)
    # lengths
    c_lens = torch.tensor([len(c) for c in claims])
    e_lens = torch.tensor([len(e) for e in evs])
    # pad
    claims_padded = nn.utils.rnn.pad_sequence(claims, batch_first=True, padding_value=0)
    evs_padded = nn.utils.rnn.pad_sequence(evs, batch_first=True, padding_value=0)
    labels = torch.tensor(labels)
    return claims_padded, c_lens, evs_padded, e_lens, labels

# ESIM Model
class ESIM(nn.Module):
    def __init__(self, vocab_size, emb_dim, hid_size, num_classes):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0)
        self.encoder = nn.LSTM(emb_dim, hid_size, bidirectional=True, batch_first=True)
        # inference composition
        self.composition = nn.LSTM(8*hid_size, hid_size, bidirectional=True, batch_first=True)
        # classification
        self.fc = nn.Sequential(
            nn.Linear(8*hid_size, hid_size),
            nn.ReLU(),
            nn.Linear(hid_size, num_classes)
        )

    def forward(self, p, p_len, h, h_len):
        # Embed
        p_emb = self.embedding(p)
        h_emb = self.embedding(h)
        # Encode
        p_enc, _ = self.encoder(p_emb)
        h_enc, _ = self.encoder(h_emb)
        # Attention
        scores = torch.matmul(p_enc, h_enc.transpose(1,2))
        p_att  = torch.softmax(scores, dim=-1)
        h_att  = torch.softmax(scores.transpose(1,2), dim=-1)
        p_align = torch.matmul(p_att, h_enc)
        h_align = torch.matmul(h_att, p_enc)
        # Compose
        m_p = torch.cat([p_enc, p_align, p_enc - p_align, p_enc * p_align], dim=-1)
        m_h = torch.cat([h_enc, h_align, h_enc - h_align, h_enc * h_align], dim=-1)
        v_p, _ = self.composition(m_p)
        v_h, _ = self.composition(m_h)
        # Pooling
        v_p_max = torch.max(v_p, dim=1).values
        v_p_avg = torch.mean(v_p, dim=1)
        v_h_max = torch.max(v_h, dim=1).values
        v_h_avg = torch.mean(v_h, dim=1)
        v = torch.cat([v_p_max, v_p_avg, v_h_max, v_h_avg], dim=1)
        return self.fc(v)


# Build vocab on train set
# train_pairs = json.load(open('/content/train_esim_pairs.json'))
vocab = build_vocab(train_pairs)

# Create datasets & loaders
train_ds = ESIMDataset('/content/train_esim_pairs.json', vocab)
dev_ds = ESIMDataset('/content/dev_esim_pairs.json', vocab)
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad_collate)
dev_loader = DataLoader(dev_ds, batch_size=BATCH_SIZE, shuffle=False, collate_fn=pad_collate)

# Instantiate Model
vocab_size = len(vocab)
model = ESIM(vocab_size, EMB_DIM, HID_SIZE, num_classes=len(LABEL2IDX))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Training Loop
for epoch in range(1, NUM_EPOCHS+1):
    model.train()
    total_loss = 0
    for claims, c_lens, evs, e_lens, labels in train_loader:
        optimizer.zero_grad()
        logits = model(claims, c_lens, evs, e_lens)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch}/{NUM_EPOCHS}, Loss: {avg_loss:.4f}")

# 3.Testing and Evaluation
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)

In [None]:
OUTPUT_PRED_PATH = Path('/content/dev-claims-predictions.json')
TOP_K = 6
# Label mappings
IDX2LABEL = {0:'SUPPORTS', 1:'REFUTES', 2:'NOT_ENOUGH_INFO', 3:'DISPUTED'}

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
model.eval()


def tokenize_seq(text, vocab, max_toks=MAX_TOKENS):
    """
    Function to preprocess & tokenize a text sequence.
    """
    toks = text.split()[:max_toks]
    idxs = [vocab.get(w, 1) for w in toks]
    return idxs, len(idxs)

predictions = {}
for cid, entry in dev_claims.items():
    claim_text = entry['claim_text']
    # Retrieve top-K evidence
    retrieved = retrieve_evidence(claim_text, top_k=TOP_K)
    ev_ids = [r['evidence_id'] for r in retrieved]
    ev_texts = [r['text'] for r in retrieved]

    # Classify each retrieved pair
    all_probs = []
    for ev_text in ev_texts:
        # tokenize separately
        c_idxs, c_len = tokenize_seq(claim_text, vocab)
        e_idxs, e_len = tokenize_seq(ev_text, vocab)
        # create tensors
        c_t = torch.tensor([c_idxs], dtype=torch.long, device=device)
        e_t = torch.tensor([e_idxs], dtype=torch.long, device=device)
        c_l = torch.tensor([c_len], dtype=torch.long, device=device)
        e_l = torch.tensor([e_len], dtype=torch.long, device=device)
        # forward
        with torch.no_grad():
            logits = model(c_t, c_l, e_t, e_l)
            probs  = torch.softmax(logits, dim=1).cpu().numpy().flatten()
        all_probs.append(probs)
    # Average probabilities across retrieved evidences
    avg_probs = np.mean(all_probs, axis=0)
    pred_idx  = int(np.argmax(avg_probs))
    pred_label = IDX2LABEL[pred_idx]

    # Save to predictions dict
    predictions[cid] = {
        'claim_text':  claim_text,
        'claim_label': pred_label,
        'evidences':   ev_ids
    }

# Write JSON
with open(OUTPUT_PRED_PATH, 'w', encoding='utf-8') as f:
    json.dump(predictions, f, ensure_ascii=False, indent=2)
print(f"Saved predictions for {len(predictions)} dev claims to {OUTPUT_PRED_PATH}.")

In [3]:
! python eval.py --predictions data/dev-claims-predictions.json --groundtruth data/dev-claims.json

Evidence Retrieval F-score (F)    = 0.14346104833117818
Claim Classification Accuracy (A) = 0.35064935064935066
Harmonic Mean of F and A          = 0.20361653405632965


## Object Oriented Programming codes here

*You can use multiple code snippets. Just add more if needed*