# 2025 CITS4012 Individual Assignment
*Make sure you change the file name with your student id.*

# Readme
*If there is something to be noted for the marker, please mention here.*

*If you are planning to implement a program with Object Oriented Programming style, please put those the bottom of this ipynb file*

# 1.Dataset Processing
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)

In [3]:
# =========================================================
# 0) 基础安装 & 导入
# =========================================================
!pip -q install pydrive2 gensim

import os, re, json, unicodedata
import numpy as np
import pandas as pd
from collections import Counter
from typing import List, Tuple

import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.metrics import accuracy_score, classification_report

print("TF:", tf.__version__)

# =========================================================
# 1) 下载数据 (换成你自己的 Google Drive 文件 ID)
# =========================================================
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from google.colab import auth as colab_auth
from oauth2client.client import GoogleCredentials

TRAIN_ID = "1YEOo5vd8DXXUCf1FXCR1D3PxWR9XxQKv"
VAL_ID   = "15FEgtzzTVDMQcNVMgwIwqoAJeF9vmtrX"
TEST_ID  = "179nwaOvdkZ3ogsBaTSJvpZEIjq20uiG-"

colab_auth.authenticate_user()
_gauth = GoogleAuth()
_gauth.credentials = GoogleCredentials.get_application_default()
_drive = GoogleDrive(_gauth)

def gdrive_download(file_id: str, dest_path: str):
    os.makedirs(os.path.dirname(dest_path), exist_ok=True)
    f = _drive.CreateFile({'id': file_id})
    f.GetContentFile(dest_path)
    print("Downloaded:", dest_path)
    return dest_path

DATA_DIR = "./data"
TRAIN_JSON = gdrive_download(TRAIN_ID, os.path.join(DATA_DIR, "train.json"))
VAL_JSON   = gdrive_download(VAL_ID,   os.path.join(DATA_DIR, "validation.json"))
TEST_JSON  = gdrive_download(TEST_ID,  os.path.join(DATA_DIR, "test.json"))

# =========================================================
# 2) 加载 & 简单清洗
# =========================================================
def load_json_as_df(path: str) -> pd.DataFrame:
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    return pd.DataFrame(data)

def normalize(s: str, to_lower=True):
    if not isinstance(s, str):
        return ""
    s = unicodedata.normalize("NFKC", s)
    # 去掉多余空格
    s = re.sub(r"\s+", " ", s).strip()
    # 去掉非字母数字和常见标点
    s = re.sub(r"[^a-zA-Z0-9.,!?;:'\"()\- ]", "", s)
    return s.lower() if to_lower else s

train_df = load_json_as_df(TRAIN_JSON)
val_df   = load_json_as_df(VAL_JSON)
test_df  = load_json_as_df(TEST_JSON)

for df in [train_df, val_df, test_df]:
    df["premise_clean"]    = df["premise"].apply(normalize)
    df["hypothesis_clean"] = df["hypothesis"].apply(normalize)

print(train_df.head())

# =========================================================
# 3) 构建词表 & 编码
# =========================================================
PAD, UNK = "<PAD>", "<UNK>"
def tokenize(s): return s.split()

counter = Counter()
for col in ["premise_clean","hypothesis_clean"]:
    for s in train_df[col]:
        counter.update(tokenize(s))

vocab = [PAD, UNK] + [w for w,c in counter.items() if c >= 2]
word2id = {w:i for i,w in enumerate(vocab)}
pad_id, unk_id = word2id[PAD], word2id[UNK]
vocab_size = len(vocab)

def encode(s, max_len):
    ids = [word2id.get(t, unk_id) for t in tokenize(s)]
    return ids[:max_len]

lens = [len(tokenize(s)) for s in train_df["premise_clean"]] + \
       [len(tokenize(s)) for s in train_df["hypothesis_clean"]]
max_len = min(64, int(np.percentile(lens, 90)))
print("Vocab:", vocab_size, "Max len:", max_len)

label2id = {"entails":0, "neutral":1}
def build_inputs(df):
    pre, hyp, y = [], [], []
    for _,r in df.iterrows():
        pre.append(encode(r["premise_clean"], max_len))
        hyp.append(encode(r["hypothesis_clean"], max_len))
        y.append(label2id[r["label"]])
    pre = pad_sequences(pre, maxlen=max_len, padding="post", value=pad_id)
    hyp = pad_sequences(hyp, maxlen=max_len, padding="post", value=pad_id)
    return pre,hyp,np.array(y)

Xtr_p, Xtr_h, y_tr = build_inputs(train_df)
Xv_p,  Xv_h,  y_v  = build_inputs(val_df)
Xt_p,  Xt_h,  y_te = build_inputs(test_df)

# =========================================================
# 4) 训练 Word2Vec 并构建 embedding matrix
# =========================================================
from gensim.models import Word2Vec

W2V_DIM = 200
sentences = []
for col in ["premise_clean", "hypothesis_clean"]:
    for s in train_df[col].tolist():
        toks = tokenize(s)
        if toks:
            sentences.append(toks)

print(f"Training Word2Vec on {len(sentences)} sentences ...")
w2v_model = Word2Vec(
    sentences=sentences,
    vector_size=W2V_DIM,
    window=5,
    min_count=2,
    workers=4,
    sg=1,        # skip-gram
    epochs=10
)
wv = w2v_model.wv

def build_embedding_matrix(word2id, wv, dim):
    vocab_size = len(word2id)
    emb_mat = np.random.normal(scale=0.02, size=(vocab_size, dim)).astype(np.float32)
    emb_mat[pad_id] = 0.0
    hit = 0
    for w,i in word2id.items():
        if w in (PAD, UNK):
            continue
        if w in wv:
            emb_mat[i] = wv[w]
            hit += 1
    print(f"W2V coverage: {hit}/{vocab_size} = {hit/vocab_size:.2%}")
    return emb_mat

embedding_matrix = build_embedding_matrix(word2id, wv, W2V_DIM)

# =========================================================
# 5) 定义 Model A：BiLSTM + Cross-Attention + Pooling + MLP
# =========================================================
class BiAffineCrossAttention(layers.Layer):
    def __init__(self, hidden_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden_dim = hidden_dim
    def build(self, input_shape):
        self.W = self.add_weight(
            name="bilinear_W",
            shape=(self.hidden_dim, self.hidden_dim),
            initializer="glorot_uniform",
            trainable=True
        )
    def call(self, inputs):
        Hp, Hh = inputs
        HpW = tf.einsum("bij,jk->bik", Hp, self.W)
        S = tf.einsum("bid,bjd->bij", HpW, Hh)
        Ap = tf.nn.softmax(S, axis=-1)
        Ah = tf.nn.softmax(tf.transpose(S, (0, 2, 1)), axis=-1)
        Cp = tf.einsum("bij,bjd->bid", Ap, Hh)
        Ch = tf.einsum("bij,bjd->bid", Ah, Hp)
        return Cp, Ch

def build_model_A_with_w2v(embedding_matrix, max_len, lstm_units=128, dropout=0.3, lr=2e-3, emb_trainable=True):
    vocab_size, emb_dim = embedding_matrix.shape
    inp_p = layers.Input(shape=(max_len,), name="premise_ids")
    inp_h = layers.Input(shape=(max_len,), name="hypothesis_ids")

    emb = layers.Embedding(
        input_dim=vocab_size,
        output_dim=emb_dim,
        weights=[embedding_matrix],
        trainable=emb_trainable,
        mask_zero=False,
        name="tok_emb_w2v"
    )
    Ep, Eh = emb(inp_p), emb(inp_h)

    Hp = layers.Bidirectional(layers.LSTM(lstm_units, return_sequences=True), name="bilstm_p")(Ep)
    Hh = layers.Bidirectional(layers.LSTM(lstm_units, return_sequences=True), name="bilstm_h")(Eh)
    d = int(Hp.shape[-1])

    Cp, Ch = BiAffineCrossAttention(d, name="cross_attn")([Hp, Hh])

    diff_p = layers.Lambda(lambda x: tf.abs(x))(layers.Subtract()([Hp, Cp]))
    prod_p = layers.Multiply()([Hp, Cp])
    Ip = layers.Concatenate(name="inter_p")([Hp, Cp, diff_p, prod_p])

    diff_h = layers.Lambda(lambda x: tf.abs(x))(layers.Subtract()([Hh, Ch]))
    prod_h = layers.Multiply()([Hh, Ch])
    Ih = layers.Concatenate(name="inter_h")([Hh, Ch, diff_h, prod_h])

    vp = layers.Concatenate()([layers.GlobalMaxPooling1D()(Ip), layers.GlobalAveragePooling1D()(Ip)])
    vh = layers.Concatenate()([layers.GlobalMaxPooling1D()(Ih), layers.GlobalAveragePooling1D()(Ih)])
    v  = layers.Concatenate(name="pair_repr")([vp, vh])

    v  = layers.Dense(256, activation="relu")(v)
    v  = layers.Dropout(dropout)(v)
    out = layers.Dense(2, activation="softmax", name="logits")(v)

    model = Model([inp_p, inp_h], out, name="BiLSTM_CrossAttn_W2V")
    model.compile(optimizer=tf.keras.optimizers.Adam(lr),
                  loss="sparse_categorical_crossentropy",
                  metrics=["accuracy"])
    return model

modelA = build_model_A_with_w2v(
    embedding_matrix=embedding_matrix,
    max_len=max_len,
    lstm_units=128,
    dropout=0.3,
    lr=2e-3,
    emb_trainable=True
)
modelA.summary()

# =========================================================
# 6) 训练 & 评估
# =========================================================
history = modelA.fit(
    [Xtr_p, Xtr_h], y_tr,
    validation_data=([Xv_p, Xv_h], y_v),
    epochs=5, batch_size=64
)

val_pred = np.argmax(modelA.predict([Xv_p,Xv_h]), axis=1)
print("Val acc:", accuracy_score(y_v, val_pred))

test_pred = np.argmax(modelA.predict([Xt_p,Xt_h]), axis=1)
print("Test acc:", accuracy_score(y_te, test_pred))
print("\nTest report:\n", classification_report(y_te, test_pred, target_names=["entails","neutral"]))

zsh:1: command not found: pip


ModuleNotFoundError: No module named 'tensorflow'

# 2.Word Embedding Construction
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)

# 3.Visualization
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)

# 4.RNN-based Model Implementation
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)

# 5.Performance Evaluation
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)

# 6.Interactive Inference Colab Form
(You can add as many code blocks and text blocks as you need. However, YOU SHOULD NOT MODIFY the section title)