In [None]:
# Install necessary libraries
!pip install transformers pdfplumber torch tqdm --quiet

from transformers import AutoTokenizer, AutoModelForMaskedLM, DataCollatorForLanguageModeling
from transformers import Trainer, TrainingArguments
from torch.utils.data import Dataset, DataLoader
import torch
import pdfplumber
import re
from tqdm import tqdm
from google.colab import drive
drive.mount('/content/drive')

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.0/42.0 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.5/48.5 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.2/59.2 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m46.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/2.8 MB[0m [31m58.5 MB/s[0m eta [36m0:00:00[0m
[?25hMounted at /content/drive


In [None]:
pdf_path = "/content/bns.pdf"

def extract_text_from_pdf(pdf_path):
    """
    Extracts text from a PDF file and returns it as a single string.
    """
    with pdfplumber.open(pdf_path) as pdf:
        text = " ".join(page.extract_text() for page in pdf.pages if page.extract_text())
    return text

bns_text = extract_text_from_pdf(pdf_path)
print("Text extracted from the PDF.")

Text extracted from the PDF.


In [None]:
def preprocess_bns_text(text):
    """
    Preprocesses the BNS text to create a mapping of sections to their descriptions.
    """
    sections = {}
    matches = re.finditer(r"(\d+\.\s+.+?)(?=\n\d+\.|\Z)", text, re.DOTALL)
    for match in matches:
        section = match.group(1)
        split = section.split(maxsplit=1)
        if len(split) > 1:
            section_number, description = split[0], split[1]
            sections[section_number.strip()] = description.strip()
    return sections

bns_sections = preprocess_bns_text(bns_text)
print(f"Extracted {len(bns_sections)} sections from the BNS text.")

Extracted 358 sections from the BNS text.


In [None]:
class BNSDataset(Dataset):
    def __init__(self, sections, tokenizer, max_length=512):
        self.texts = [text for text in sections.values()]
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        encoding = self.tokenizer(
            text,
            truncation=True,
            max_length=self.max_length,
            padding='max_length',
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze()
        }

In [None]:
import os
import time
# Create directories in Google Drive
BASE_PATH = "/content/drive/MyDrive/bns_legal_bert"
CHECKPOINT_DIR = f"{BASE_PATH}/checkpoints_new_{int(time.time())}"
FINAL_MODEL_DIR = f"{BASE_PATH}/final_model"

# Create directories if they don't exist
for dir_path in [BASE_PATH, CHECKPOINT_DIR, FINAL_MODEL_DIR]:
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)

# Initialize model and tokenizer
model_name = "law-ai/InLegalBERT"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForMaskedLM.from_pretrained(model_name)

# Create dataset
dataset = BNSDataset(bns_sections, tokenizer)
print(f"Created dataset with {len(dataset)} sections")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/516 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/222k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/671 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/534M [00:00<?, ?B/s]

BertForMaskedLM has generative capabilities, as `prepare_inputs_for_generation` is explicitly overwritten. However, it doesn't directly inherit from `GenerationMixin`. From 👉v4.50👈 onwards, `PreTrainedModel` will NOT inherit from `GenerationMixin`, and this model will lose the ability to call `generate` and other related functions.
  - If you are the owner of the model architecture code, please modify your model class such that it inherits from `GenerationMixin` (after `PreTrainedModel`, otherwise you'll get an exception).
  - If you are not the owner of the model architecture class, please contact the model code owner to update it.


Created dataset with 358 sections


In [None]:
from transformers import TrainerCallback
class DriveCheckpointCallback(TrainerCallback):
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer

    def on_step_end(self, args, state, control, **kwargs):
        if state.global_step % 100 == 0:
            checkpoint_dir = f"{CHECKPOINT_DIR}/checkpoint-{state.global_step}"
            if not os.path.exists(checkpoint_dir):
                os.makedirs(checkpoint_dir)
            kwargs['model'].save_pretrained(checkpoint_dir)
            self.tokenizer.save_pretrained(checkpoint_dir)

# Training arguments
training_args = TrainingArguments(
    output_dir=CHECKPOINT_DIR,
    overwrite_output_dir=True,
    num_train_epochs=100,
    per_device_train_batch_size=8,
    save_strategy="steps",
    save_steps=100,
    save_total_limit=3,
    prediction_loss_only=True,
    learning_rate=2e-5,                    # Reduced from 3e-5
    weight_decay=0.1,                      # Increased from 0.01
    logging_dir=f"{BASE_PATH}/logs",
    logging_steps=50,
    report_to="none",                      # Disable wandb
    warmup_ratio=0.1,                      # Added warmup
    gradient_accumulation_steps=2,         # Added gradient accumulation
)

# Apply dropout to the model directly
for module in model.modules():
    if isinstance(module, torch.nn.Dropout):
        module.p = 0.2  # Set dropout probability to 0.2
    elif isinstance(module, torch.nn.MultiheadAttention):
        module.dropout = 0.2  # Set attention dropout probability to 0.2


# Create data collator
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=True,
    mlm_probability=0.15
)

# Initialize trainer
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=dataset,
    callbacks=[DriveCheckpointCallback(tokenizer=tokenizer)]
)

In [None]:
import time
from datetime import datetime

start_time = time.time()
print(f"Starting training at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

try:
    trainer.train()

    # Save final model
    model.save_pretrained(FINAL_MODEL_DIR)
    tokenizer.save_pretrained(FINAL_MODEL_DIR)
    print("Training completed successfully!")

except Exception as e:
    print(f"Training interrupted: {str(e)}")
    # Save model on interruption
    emergency_save_dir = f"{BASE_PATH}/emergency_save_{int(time.time())}"
    model.save_pretrained(emergency_save_dir)
    tokenizer.save_pretrained(emergency_save_dir)
    print(f"Model saved to {emergency_save_dir}")

finally:
    end_time = time.time()
    training_time = (end_time - start_time) / 60
    print(f"Training time: {training_time:.2f} minutes")

In [None]:
def load_latest_checkpoint():
    checkpoints = [d for d in os.listdir(CHECKPOINT_DIR) if d.startswith('checkpoint-')]
    if not checkpoints:
        print("No checkpoints found")
        return None

    latest_checkpoint = max(checkpoints, key=lambda x: int(x.split('-')[1]))
    checkpoint_path = os.path.join(CHECKPOINT_DIR, latest_checkpoint)

    print(f"Loading checkpoint: {checkpoint_path}")
    model = AutoModelForMaskedLM.from_pretrained(checkpoint_path)
    tokenizer = AutoTokenizer.from_pretrained(checkpoint_path)

    return model, tokenizer

In [None]:
test_text = "In cases of [MASK] force, the accused shall be liable under Section 129."
inputs = tokenizer(test_text, return_tensors="pt", truncation=True, max_length=512)

# Move inputs to the same device as the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Get the model's device
inputs = {k: v.to(device) for k, v in inputs.items()}  # Move all input tensors to the device

with torch.no_grad():
    outputs = model(**inputs).logits

mask_token_index = torch.where(inputs["input_ids"] == tokenizer.mask_token_id)[1]
predicted_token_id = outputs[0, mask_token_index].argmax(axis=-1)
predicted_text = tokenizer.decode(predicted_token_id)

print(f"Test sentence: {test_text}")
print(f"Model prediction: {predicted_text}")

Test sentence: In cases of [MASK] force, the accused shall be liable under Section 129.
Model prediction: private


In [None]:
# First load the fine-tuned model
BASE_PATH = "/content/drive/MyDrive/bns_legal_bert"
FINAL_MODEL_DIR = f"{BASE_PATH}/final_model"

fine_tuned_model = AutoModelForMaskedLM.from_pretrained(FINAL_MODEL_DIR)
tokenizer = AutoTokenizer.from_pretrained(FINAL_MODEL_DIR)

def precompute_section_embeddings(bns_sections, tokenizer, model):
    """
    Precomputes and stores embeddings for all BNS sections using the fine-tuned model.
    """
    section_embeddings = {}
    for section_number, description in bns_sections.items():
        encoded_section = tokenizer(description, return_tensors="pt", truncation=True, max_length=512)
        with torch.no_grad():
            section_output = fine_tuned_model(**encoded_section, output_hidden_states=True)
        section_embeddings[section_number] = section_output.hidden_states[-1].mean(dim=1)
    return section_embeddings

def get_relevant_sections(incident_report, section_embeddings, tokenizer, model):
    """
    Retrieves relevant BNS sections for an incident report using the fine-tuned model.
    """
    # Encode and embed the incident report
    encoded_input = tokenizer(incident_report, return_tensors="pt", truncation=True, max_length=512)
    with torch.no_grad():
        output = fine_tuned_model(**encoded_input, output_hidden_states=True)
    report_embedding = output.hidden_states[-1].mean(dim=1)

    # Compare with section embeddings
    relevant_sections = []
    for section_number, section_embedding in section_embeddings.items():
        similarity = torch.nn.functional.cosine_similarity(report_embedding, section_embedding)
        relevant_sections.append((section_number, similarity.item()))

    # Sort by similarity and return top matches
    return sorted(relevant_sections, key=lambda x: x[1], reverse=True)

# Precompute embeddings using fine-tuned model
section_embeddings = precompute_section_embeddings(bns_sections, tokenizer, fine_tuned_model)
print("Pre-computed embeddings for all sections using fine-tuned model")

Pre-computed embeddings for all sections using fine-tuned model


In [None]:
# Function to format and display results
def analyze_incident_report(incident_report):
    relevant_sections = get_relevant_sections(incident_report, section_embeddings, tokenizer, fine_tuned_model)

    print("\nTop 5 Relevant BNS Sections:\n")
    for section_number, score in relevant_sections[:5]:
        section_text = bns_sections[section_number]
        # Split section text into title and description if it contains an em dash
        if "—" in section_text:
            title, description = section_text.split("—", 1)
        else:
            title, description = section_text, ""

        print(f"Section {section_number}: {title.strip()}")
        print(f"Similarity Score: {score:.3f}")
        # print(f"Description: {description.strip()}\n")

# Interactive input
while True:
    print("\nEnter your incident report (or 'quit' to exit):")
    incident_report = input()

    if incident_report.lower() == 'quit':
        break

    analyze_incident_report(incident_report)