<a href="https://colab.research.google.com/github/pankhauda/pbl_project/blob/main/pbl_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install transformers torch scikit-learn pandas numpy -q

import json
import re
import pandas as pd
import numpy as np
import torch

from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import f1_score, precision_score, recall_score

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cpu


In [None]:
def load_jsonl(path):
    with open(path, "r", encoding="utf-8") as f:
        return [json.loads(line) for line in f]

train_data = load_jsonl("train.jsonl")
dev_data = load_jsonl("dev.jsonl")
test_data = load_jsonl("test.jsonl")

print("Train:", len(train_data))
print("Dev:", len(dev_data))
print("Test:", len(test_data))

In [None]:
def clean_ipc_labels(label_list):
    cleaned = []
    for label in label_list:
        match = re.search(r'Section\s+(\d+[A-Z]?)', label)
        if match:
            cleaned.append(match.group(1))
    return cleaned


def normalize_text(text):
    if isinstance(text, list):
        return " ".join(text)
    return str(text)


def build_dataframe(data):
    rows = []
    for item in data:
        rows.append({
            "text": normalize_text(item["text"]),
            "labels": clean_ipc_labels(item["labels"])
        })
    return pd.DataFrame(rows)

train_df = build_dataframe(train_data)
dev_df = build_dataframe(dev_data)
test_df = build_dataframe(test_data)

train_df = train_df[train_df["labels"].map(len) > 0]
dev_df = dev_df[dev_df["labels"].map(len) > 0]
test_df = test_df[test_df["labels"].map(len) > 0]

print("After cleaning:")
print("Train:", len(train_df))

In [None]:
mlb = MultiLabelBinarizer()

mlb.fit(train_df["labels"])

print("Total IPC sections:", len(mlb.classes_))

In [None]:
MODEL_NAME = "nlpaueb/legal-bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

In [None]:
class IPCDataset(torch.utils.data.Dataset):
    def __init__(self, df, tokenizer, mlb):
        self.texts = df["text"].tolist()
        self.labels = torch.tensor(
            mlb.transform(df["labels"]),
            dtype=torch.float32
        )
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoding = self.tokenizer(
        self.texts[idx],
        truncation=True,
        padding="max_length",
        max_length=256,
        return_tensors="pt",
        return_token_type_ids=False
    )

        item = {k: v.squeeze(0) for k, v in encoding.items()}
        item["labels"] = self.labels[idx]
        return item

In [None]:
train_dataset = IPCDataset(train_df, tokenizer, mlb)
dev_dataset = IPCDataset(dev_df, tokenizer, mlb)
test_dataset = IPCDataset(test_df, tokenizer, mlb)

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=len(mlb.classes_),
    problem_type="multi_label_classification"
).to(device)

In [None]:
model.gradient_checkpointing_enable()

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    probs = torch.sigmoid(torch.tensor(logits))
    preds = (probs > 0.5).int().numpy()

    f1 = f1_score(labels, preds, average="micro", zero_division=0)
    precision = precision_score(labels, preds, average="micro", zero_division=0)
    recall = recall_score(labels, preds, average="micro", zero_division=0)

    return {
        "f1_micro": f1,
        "precision": precision,
        "recall": recall
    }

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os

SAVE_PATH = "/content/drive/MyDrive/ILSI_IPC_MODEL"

os.makedirs(SAVE_PATH, exist_ok=True)

In [None]:
training_args = TrainingArguments(
    output_dir=SAVE_PATH,
    num_train_epochs=3,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=4,
    learning_rate=2e-5,
    weight_decay=0.01,
    warmup_steps=500,
    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=2,
    load_best_model_at_end=True,
    fp16=torch.cuda.is_available(),
    report_to="none"
)

In [None]:
model.gradient_checkpointing_enable()

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=dev_dataset,
    compute_metrics=compute_metrics
)

trainer.train()

In [None]:
trainer.save_model(SAVE_PATH)
tokenizer.save_pretrained(SAVE_PATH)

In [None]:
import pickle

with open(f"{SAVE_PATH}/mlb.pkl", "wb") as f:
    pickle.dump(mlb, f)

In [None]:
print("ILSI Test Results:")
print(trainer.evaluate(test_dataset))

In [None]:
model.gradient_checkpointing_enable()

In [2]:
from google.colab import drive
drive.mount('/content/drive')

import torch
import pickle
from transformers import AutoModelForSequenceClassification, AutoTokenizer

MODEL_PATH = "/content/drive/MyDrive/ILSI_IPC_MODEL"

model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH)
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)

with open(f"{MODEL_PATH}/mlb.pkl", "rb") as f:
    mlb = pickle.load(f)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

print("Model loaded successfully.")

Mounted at /content/drive


Loading weights:   0%|          | 0/201 [00:00<?, ?it/s]

Model loaded successfully.


In [3]:
!ls drive/MyDrive/ILSI_DATA/dev.jsonl

drive/MyDrive/ILSI_DATA/dev.jsonl


In [10]:
# Reload test.jsonl
import json
import re
import pandas as pd
import torch

def load_jsonl(path):
    with open(path, "r", encoding="utf-8") as f:
        return [json.loads(line) for line in f]

test_data = load_jsonl("/content/drive/MyDrive/ILSI_DATA/test.jsonl")

def clean_ipc_labels(label_list):
    cleaned = []
    for label in label_list:
        match = re.search(r'Section\s+(\d+[A-Z]?)', label)
        if match:
            cleaned.append(match.group(1))
    return cleaned

def normalize_text(text):
    if isinstance(text, list):
        return " ".join(text)
    return str(text)

def build_dataframe(data):
    rows = []
    for item in data:
        rows.append({
            "text": normalize_text(item["text"]),
            "labels": clean_ipc_labels(item["labels"])
        })
    return pd.DataFrame(rows)

test_df = build_dataframe(test_data)
test_df = test_df[test_df["labels"].map(len) > 0]

print("Test samples:", len(test_df))

Test samples: 13039


In [11]:
import json
import pandas as pd
import re

def load_jsonl(path):
    with open(path, "r", encoding="utf-8") as f:
        return [json.loads(line) for line in f]

dev_data = load_jsonl("/content/drive/MyDrive/ILSI_DATA/dev.jsonl")

def clean_ipc_labels(label_list):
    cleaned = []
    for label in label_list:
        match = re.search(r'Section\s+(\d+[A-Z]?)', label)
        if match:
            cleaned.append(match.group(1))
    return cleaned

def normalize_text(text):
    if isinstance(text, list):
        return " ".join(text)
    return str(text)

dev_df = pd.DataFrame({
    "text": [normalize_text(x["text"]) for x in dev_data],
    "labels": [clean_ipc_labels(x["labels"]) for x in dev_data]
})

dev_df = dev_df[dev_df["labels"].map(len) > 0]

dev_df["labels"] = dev_df["labels"].apply(
    lambda x: [l for l in x if l in mlb.classes_]
)

dev_df = dev_df[dev_df["labels"].map(len) > 0]

print("Dev samples:", len(dev_df))

Dev samples: 10200


In [12]:
class IPCDataset(torch.utils.data.Dataset):
    def __init__(self, df, tokenizer, mlb):
        self.texts = df["text"].tolist()
        self.labels = torch.tensor(
            mlb.transform(df["labels"]),
            dtype=torch.float32
        )
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoding = self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding="max_length",
            max_length=256,
            return_tensors="pt",
            return_token_type_ids=False
        )

        item = {k: v.squeeze(0) for k, v in encoding.items()}
        item["labels"] = self.labels[idx]
        return item

In [16]:
test_dataset = IPCDataset(test_df, tokenizer, mlb)

In [17]:
dev_dataset = IPCDataset(dev_df, tokenizer, mlb)

In [None]:
from sklearn.metrics import f1_score
import numpy as np

def find_best_threshold_model(model, dataset):

    all_probs = []
    all_labels = []

    loader = torch.utils.data.DataLoader(dataset, batch_size=8)

    model.eval()

    with torch.no_grad():
        for batch in loader:
            labels = batch["labels"].numpy()
            inputs = {k: v.to(device) for k, v in batch.items() if k != "labels"}

            outputs = model(**inputs)
            probs = torch.sigmoid(outputs.logits).cpu().numpy()

            all_probs.append(probs)
            all_labels.append(labels)

    all_probs = np.vstack(all_probs)
    all_labels = np.vstack(all_labels)

    best_f1 = 0
    best_threshold = 0.5

    for t in np.linspace(0.05, 0.9, 50):
        preds = (all_probs > t).astype(int)
        f1 = f1_score(all_labels, preds, average="micro", zero_division=0)

        if f1 > best_f1:
            best_f1 = f1
            best_threshold = t

    print("Best Threshold:", round(best_threshold,4))
    print("Best F1_micro:", round(best_f1,4))

    return best_threshold

best_threshold = find_best_threshold_model(model, dev_dataset)

In [None]:
from sklearn.metrics import f1_score, precision_score, recall_score
import numpy as np

def evaluate_dataset(model, dataset, threshold):

    all_probs = []
    all_labels = []

    loader = torch.utils.data.DataLoader(dataset, batch_size=8)

    model.eval()

    with torch.no_grad():
        for batch in loader:
            labels = batch["labels"].numpy()
            inputs = {k: v.to(device) for k, v in batch.items() if k != "labels"}

            outputs = model(**inputs)
            probs = torch.sigmoid(outputs.logits).cpu().numpy()

            all_probs.append(probs)
            all_labels.append(labels)

    all_probs = np.vstack(all_probs)
    all_labels = np.vstack(all_labels)

    preds = (all_probs > threshold).astype(int)

    f1 = f1_score(all_labels, preds, average="micro", zero_division=0)
    precision = precision_score(all_labels, preds, average="micro", zero_division=0)
    recall = recall_score(all_labels, preds, average="micro", zero_division=0)

    print("F1_micro:", round(f1,4))
    print("Precision:", round(precision,4))
    print("Recall:", round(recall,4))

    return f1


In [None]:
print("ILSI TEST RESULTS (Optimized Threshold)")
evaluate_dataset(model, test_dataset, threshold=0.3102)

In [None]:
def explain_with_gradients(text, target_section):

    model.eval()

    text = expand_short_query(text)

    inputs = tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        padding=True,
        max_length=256
    )

    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Get embeddings
    embeddings = model.get_input_embeddings()(inputs["input_ids"])
    embeddings.retain_grad()
    embeddings.requires_grad_(True)

    outputs = model(
        inputs_embeds=embeddings,
        attention_mask=inputs["attention_mask"]
    )

    probs = torch.sigmoid(outputs.logits)

    section_index = list(mlb.classes_).index(target_section)

    score = probs[0, section_index]

    model.zero_grad()
    score.backward()

    grads = embeddings.grad.abs().sum(dim=2).squeeze()

    tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])

    token_importance = list(zip(tokens, grads.detach().cpu().numpy()))
    token_importance = sorted(token_importance, key=lambda x: x[1], reverse=True)

    important_tokens = [tok for tok, _ in token_importance[:5]]

    return important_tokens

In [None]:
def expand_short_query(text):
    text = text.lower().strip()

    if len(text.split()) <= 3:

        templates = {
            "rape": "The accused forcibly committed rape against the victim.",
            "murder": "The accused intentionally killed the victim with a weapon.",
            "kidnap": "The accused kidnapped the victim unlawfully and confined them.",
            "theft": "The accused dishonestly stole property from the complainant.",
            "robbery": "The accused committed robbery using force and threat.",
        }

        return templates.get(text, f"The accused committed the offence of {text} against the victim.")

    return text

def predict_ipc(text, threshold=0.3102, top_k=5):

    text = expand_short_query(text)

    inputs = tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        padding=True,
        max_length=256
    )

    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)
        probs = torch.sigmoid(outputs.logits).cpu().numpy()[0]

    predictions = []

    # Primary threshold filtering
    for i, p in enumerate(probs):
        if p > threshold:
            predictions.append((mlb.classes_[i], float(p)))

    # If too few predictions, relax threshold slightly
    if len(predictions) < 2:
        relaxed_threshold = threshold * 0.7
        for i, p in enumerate(probs):
            if p > relaxed_threshold:
                predictions.append((mlb.classes_[i], float(p)))

    # Remove duplicates
    predictions = list(set(predictions))

    # Sort
    predictions = sorted(predictions, key=lambda x: x[1], reverse=True)

    # Ensure at least top_k returned
    if len(predictions) == 0:
        top_indices = probs.argsort()[-top_k:][::-1]
        predictions = [(mlb.classes_[i], float(probs[i])) for i in top_indices]

    return predictions[:top_k]

In [None]:
def interactive():

    print("="*60)
    print("IPC SECTION RECOMMENDER (With Explanation)")
    print("="*60)
    print("Optimized Threshold:", 0.3102)
    print("Type 'quit' to exit.\n")

    while True:
        text = input("Enter crime description: ")

        if text.lower() == "quit":
            print("Exiting...")
            break

        preds = predict_ipc(text)

        if not preds:
            print("No IPC sections predicted.\n")
        else:
            print("\nPredicted IPC Sections:")
            print("-"*50)

            for sec, prob in preds:
                print(f"Section {sec} → {prob*100:.2f}%")

                tokens = explain_with_gradients(text, sec)

                # Clean tokens
                clean_tokens = []
                for tok in tokens:
                    if tok.startswith("##"):
                        tok = tok[2:]
                    if tok not in ["[CLS]", "[SEP]", "[PAD]"]:
                        clean_tokens.append(tok)

                print("Model focused on:", ", ".join(clean_tokens))
                print("-"*50)

        print()

interactive()

In [6]:
import torch
import pickle
import numpy as np
import pandas as pd
import re

from datasets import load_dataset
from sklearn.metrics import f1_score, precision_score, recall_score
from transformers import AutoModelForSequenceClassification, AutoTokenizer

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

MODEL_PATH = "/content/drive/MyDrive/ILSI_IPC_MODEL"

model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH)
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)

with open(f"{MODEL_PATH}/mlb.pkl", "rb") as f:
    mlb = pickle.load(f)

model.to(device)
model.eval()

print("Model + tokenizer + mlb loaded successfully.")

Loading weights:   0%|          | 0/201 [00:00<?, ?it/s]

Model + tokenizer + mlb loaded successfully.


In [7]:
nyaya = load_dataset("L-NLProc/NyayaAnumana-Transformers-Results", split="train")
nyaya_df = nyaya.to_pandas()

print("Total raw samples:", len(nyaya_df))

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/31.0 [00:00<?, ?B/s]

Transformer Based/InCaseLawBERT_predicti(…):   0%|          | 0.00/34.5M [00:00<?, ?B/s]



Transformer Based/InLegalBert_prediction(…):   0%|          | 0.00/34.5M [00:00<?, ?B/s]

Transformer Based/Xlnet_on_ILDC_to_Nyaya(…):   0%|          | 0.00/34.5M [00:00<?, ?B/s]

Transformer Based/Xlnet_prediction.csv:   0%|          | 0.00/34.5M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/6068 [00:00<?, ? examples/s]

Total raw samples: 6068


In [8]:
def extract_ipc(text):
    pattern = r'(?:Section|Sections|section|u/s|under section)\s+(\d{1,3}[A-Z]?)'
    matches = re.findall(pattern, text)
    return list(set(matches))

nyaya_df["labels"] = nyaya_df["text"].apply(extract_ipc)
nyaya_df = nyaya_df[nyaya_df["labels"].map(len) > 0]

# Keep only IPCs seen in ILSI
nyaya_df["labels"] = nyaya_df["labels"].apply(
    lambda x: [label for label in x if label in mlb.classes_]
)

nyaya_df = nyaya_df[nyaya_df["labels"].map(len) > 0]

print("Samples after IPC filtering:", len(nyaya_df))

Samples after IPC filtering: 3244


In [9]:
def extract_action_core(text):
    if not isinstance(text, str):
        return ""

    paragraphs = text.split("\n")

    strong_actions = [
        "stabbed", "killed", "murdered",
        "assaulted", "injured", "raped",
        "abducted", "kidnapped",
        "robbed", "stole", "cheated",
        "forged", "threatened", "shot",
        "burnt", "strangled"
    ]

    procedural_noise = [
        "judge", "court", "appeal",
        "conviction", "sentence",
        "trial", "evidence",
        "witness", "magistrate",
        "learned counsel"
    ]

    selected = []

    for p in paragraphs:
        p_lower = p.lower()

        if any(action in p_lower for action in strong_actions):

            # reject procedural discussion
            if not any(noise in p_lower for noise in procedural_noise):

                if len(p.split()) > 12:
                    selected.append(p.strip())

    if selected:
        return " ".join(selected[:3])

    return ""

nyaya_df["text"] = nyaya_df["text"].apply(extract_action_core)
nyaya_df = nyaya_df[nyaya_df["text"].str.len() > 30]

nyaya_encoded = mlb.transform(nyaya_df["labels"])

In [15]:
mean_labels = int(round(nyaya_df["labels"].apply(len).mean()))
TEMPERATURE = 1.3
MIN_PROB = 0.10
MAX_EXTRA = 1

all_preds = []

for text in nyaya_df["text"]:

    inputs = tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        padding=True,
        max_length=384
    )

    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits / TEMPERATURE
        probs = torch.sigmoid(logits).cpu().numpy()[0]

    sorted_indices = probs.argsort()[::-1]

    pred_vector = np.zeros_like(probs)

    count = 0

    for idx in sorted_indices:

        if probs[idx] > MIN_PROB:
            pred_vector[idx] = 1
            count += 1

        if count >= mean_labels + MAX_EXTRA:
            break

    all_preds.append(pred_vector)

In [16]:
f1 = f1_score(nyaya_encoded, all_preds, average="micro", zero_division=0)
precision = precision_score(nyaya_encoded, all_preds, average="micro", zero_division=0)
recall = recall_score(nyaya_encoded, all_preds, average="micro", zero_division=0)

print("\nNYAYA (UNSEEN) RESULTS")

print("F1_micro:", round(f1, 4))
print("Precision:", round(precision, 4))
print("Recall:", round(recall, 4))


NYAYA (UNSEEN) RESULTS
F1_micro: 0.2071
Precision: 0.1939
Recall: 0.2222
