# **Summarization using SentenceBert & LegalBert**

In [None]:
# imports
!pip install transformers torch
!pip install ftfy
!pip install spacy
!pip install datasets
!pip install rouge-score
# Load spaCy's English model
!python -m spacy download en_core_web_sm
# Import the spacy module
import spacy # This line is added to import spacy into the current scope

nlp = spacy.load("en_core_web_sm")

import os
import re
import zipfile
import ftfy
import shutil
import chardet
from bs4 import BeautifulSoup
from transformers import BertTokenizer, BertForSequenceClassification, EncoderDecoderModel
import torch
from tqdm import tqdm
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.meteor_score import meteor_score
import nltk
nltk.download('wordnet')

In [None]:
# Define the input zip files and output directories
zip_file_1 = "ToSDRData.zip"
zip_file_2 = "ReferenceSummaries.zip"
output_dir_1 = "DataSet"
output_dir_2 = "ReferenceSummaries"

def recreate_folder(folder_path):
    # If the folder exists, delete it
    if os.path.exists(folder_path):
        shutil.rmtree(folder_path)
    # Recreate the folder
    os.makedirs(folder_path)

# Recreate the output folders
recreate_folder(output_dir_1)
recreate_folder(output_dir_2)

# Function to extract files from a zip
def extract_files(zip_file, output_dir, folder_name):
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        # List all files in the zip
        all_files = zip_ref.namelist()

        # Filter files in the specific folder (e.g., 'text/')
        folder_files = [f for f in all_files if f.startswith(folder_name + '/') and not f.endswith('/')]

        # Extract only the files in the specified folder
        for file in folder_files:
            # Determine the target path in the output folder
            target_path = os.path.join(output_dir, os.path.relpath(file, folder_name))

            # Ensure the directory structure exists
            os.makedirs(os.path.dirname(target_path), exist_ok=True)

            # Extract the file
            with zip_ref.open(file) as source, open(target_path, 'wb') as target:
                target.write(source.read())

# Extract files from 'text' folder in ToSDRData.zip into DataSet
extract_files(zip_file_1, output_dir_1, 'text')

# Extract files from 'text' folder in ReferenceSummaries.zip into ReferenceSummaries
extract_files(zip_file_2, output_dir_2, 'text')

print(f"All files from 'text' folder in {zip_file_1} have been extracted to {output_dir_1}.")
print(f"All files from 'text' folder in {zip_file_2} have been extracted to {output_dir_2}.")

In [None]:
# Pre-processing
# Load spaCy model for sentence segmentation
nlp = spacy.blank("en")
nlp.add_pipe("sentencizer")  # Add the sentencizer component

# Input and output folder paths
input_folder = "DataSet"
output_folder = "PreprocessedDataSet"

def recreate_folder(folder_path):
    # If the folder exists, delete it
    if os.path.exists(folder_path):
        shutil.rmtree(folder_path)
    # Recreate the folder
    os.makedirs(folder_path)

# Recreate the output folder
recreate_folder(output_folder)

# Function to remove HTML tags but retain URLs
def remove_html_tags(text):
    # Parse HTML content using BeautifulSoup
    soup = BeautifulSoup(text, "html.parser")

    # Find and preserve URLs
    urls = [a['href'] for a in soup.find_all('a', href=True)]

    # Extract the plain text
    plain_text = soup.get_text(separator=" ")

    # Reinsert URLs into the text
    for url in urls:
        plain_text += f" {url}"

    return plain_text

# Function to preprocess a single file
def preprocess_text(text):
    # Step 1: Remove HTML tags but retain URLs
    text = remove_html_tags(text)

    # Step 2: Remove special characters and normalize white spaces
    text = re.sub(r"[^\x00-\x7F]+", " ", text)  # Remove non-ASCII characters
    text = re.sub(r"\s+", " ", text).strip()  # Normalize spaces and remove leading/trailing spaces

    # Step 3: Retain enumerations (e.g., 1.1, A, i., etc.)
    enumerations = re.findall(r"(^|\s)(\d+\.\d+|[A-Za-z]|[ivxIVX]+)(?=[\.\)]\s)", text)
    enumerations = {e[1] for e in enumerations}

    # Step 4: Break into sentences for better readability
    doc = nlp(text)
    sentences = [sent.text.strip() for sent in doc.sents]

    # Step 5: Highlight enumerations as important points
    important_points = [sent for sent in sentences if any(e in sent for e in enumerations)]

    # Combine all text and important points
    processed_text = "\n".join(important_points + sentences)
    return processed_text

# Loop through all files in the input folder
for file_name in os.listdir(input_folder):
    input_path = os.path.join(input_folder, file_name)

    # Ensure we process only text files
    if os.path.isfile(input_path) and input_path.endswith(".txt"):
        # Read the file content
        with open(input_path, "r", encoding="utf-8") as file:
            content = file.read()

        # Preprocess the text
        preprocessed_content = preprocess_text(content)

        # Save the preprocessed text to the output folder
        output_path = os.path.join(output_folder, file_name)
        with open(output_path, "w", encoding="utf-8") as file:
            file.write(preprocessed_content)

print(f"Preprocessing completed. Files saved in '{output_folder}'.")


In [None]:
import os
import torch
import pandas as pd
from transformers import BertTokenizer, EncoderDecoderModel, BertForSequenceClassification
from torch.utils.data import Dataset, DataLoader
from transformers import AdamW
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
import matplotlib.pyplot as plt

# Paths
preprocessed_folder = "PreprocessedDataSet"
reference_folder = "ReferenceSummaries"
annotations_file = "Annotations.csv"  # CSV file containing 'text' and 'summary'

# Hyperparameters
EPOCHS = 5
BATCH_SIZE = 10
LEARNING_RATE = 1e-5
MAX_INPUT_LENGTH = 512
MAX_TARGET_LENGTH = 128
CHUNK_SIZE = 512
CHUNK_OVERLAP = 50

# Initialize Tokenizer and Model
tokenizer = BertTokenizer.from_pretrained("nlpaueb/legal-bert-base-uncased")
model = EncoderDecoderModel.from_encoder_decoder_pretrained(
    "nlpaueb/legal-bert-base-uncased", "nlpaueb/legal-bert-base-uncased"
)
model.config.decoder_start_token_id = tokenizer.cls_token_id
model.config.pad_token_id = tokenizer.pad_token_id
model.config.eos_token_id = tokenizer.sep_token_id

# Initialize Sentence-BERT model
sentence_bert = SentenceTransformer('paraphrase-MiniLM-L6-v2')

# Function to chunk text
def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
    tokens = tokenizer.tokenize(text)
    chunks = []
    for i in range(0, len(tokens), chunk_size - overlap):
        chunk = tokens[i:i + chunk_size]
        chunks.append(tokenizer.convert_tokens_to_string(chunk))
    return chunks

# Dataset Class
class LegalDataset(Dataset):
    def __init__(self, input_folder, reference_folder, file_list, tokenizer, sentence_bert, annotations_df, max_input_len, max_target_len):
        self.input_folder = input_folder
        self.reference_folder = reference_folder
        self.file_list = file_list
        self.tokenizer = tokenizer
        self.sentence_bert = sentence_bert
        self.annotations_df = annotations_df
        self.max_input_len = max_input_len
        self.max_target_len = max_target_len

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        file_name = self.file_list[idx]
        # Load input text
        with open(os.path.join(self.input_folder, file_name), "r", encoding="utf-8") as f:
            input_text = f.read().strip()
        # Chunk the input text
        input_chunks = chunk_text(input_text)

        # Load reference summary
        with open(os.path.join(self.reference_folder, file_name), "r", encoding="utf-8") as f:
            target_text = f.read().strip()

        # Tokenize the first chunk (for simplicity, you can modify this to include multiple chunks)
        input_tokens = self.tokenizer(
            input_chunks[0], max_length=self.max_input_len, padding="max_length", truncation=True, return_tensors="pt"
        )
        target_tokens = self.tokenizer(
            target_text, max_length=self.max_target_len, padding="max_length", truncation=True, return_tensors="pt"
        )

        # Match the sentence in the chunk to its corresponding summary using Sentence-BERT
        text_embedding = self.sentence_bert.encode(input_chunks[0])
        summary_embedding = self.sentence_bert.encode(target_text)

        # Calculate cosine similarity (optional, can be used for selecting relevant sentences)
        cosine_sim = cosine_similarity([text_embedding], [summary_embedding])[0][0]

        return {
            "input_ids": input_tokens["input_ids"].squeeze(),
            "attention_mask": input_tokens["attention_mask"].squeeze(),
            "labels": target_tokens["input_ids"].squeeze(),
            "cosine_sim": cosine_sim,
        }

# Prepare Training Data
file_list = sorted(os.listdir(preprocessed_folder))[:15]
annotations_df = pd.read_csv(annotations_file)
dataset = LegalDataset(preprocessed_folder, reference_folder, file_list, tokenizer, sentence_bert, annotations_df, MAX_INPUT_LENGTH, MAX_TARGET_LENGTH)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

# Optimizer
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)

# Track learning rate and loss for graph
learning_rates = []
losses = []

# Training Loop
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

print("Starting Training...")
for epoch in range(EPOCHS):
    model.train()
    epoch_loss = 0
    progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{EPOCHS}")
    for batch in progress_bar:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        # Forward pass
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        epoch_loss += loss.item()

        # Track learning rate at each step
        learning_rates.append(optimizer.param_groups[0]['lr'])
        # Track loss at each step
        losses.append(loss.item())

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        progress_bar.set_postfix({"Loss": loss.item()})

    print(f"Epoch {epoch+1} Loss: {epoch_loss / len(dataloader)}")

# Save the fine-tuned model
model.save_pretrained("fine_tuned_legalbert_with_sbert")
tokenizer.save_pretrained("fine_tuned_legalbert_with_sbert")
print("Fine-tuning completed and model saved!")

# Plot learning rate graph
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(learning_rates)
plt.title("Learning Rate over Training Steps")
plt.xlabel("Training Step")
plt.ylabel("Learning Rate")

# Plot loss graph
plt.subplot(1, 2, 2)
plt.plot(losses)
plt.title("Loss over Training Steps")
plt.xlabel("Training Step")
plt.ylabel("Loss")

# Show the graphs
plt.tight_layout()
plt.show()


In [None]:
import os
import shutil
import torch
from transformers import BertTokenizer, EncoderDecoderModel
from tqdm import tqdm
from rouge_score import rouge_scorer

# Paths
preprocessed_folder = "PreprocessedDataSet"
generated_folder = "GeneratedSummaries"
reference_folder = "ReferenceSummaries"
fine_tuned_model_path = "fine_tuned_legalbert_with_sbert"  # Path to the fine-tuned model and tokenizer

# Initialize Tokenizer and Model from the fine-tuned model
tokenizer = BertTokenizer.from_pretrained(fine_tuned_model_path)
model = EncoderDecoderModel.from_pretrained(fine_tuned_model_path)
model.to("cuda" if torch.cuda.is_available() else "cpu")

# Ensure the GeneratedSummaries folder exists, and clean it if needed
if os.path.exists(generated_folder):
    shutil.rmtree(generated_folder)  # Remove existing folder if it exists
os.makedirs(generated_folder)

# Initialize ROUGE scorer
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

# Function to chunk the text to fit the model's input size
def chunk_text(text, chunk_size=512, overlap=50):
    tokens = tokenizer.tokenize(text)
    chunks = []
    for i in range(0, len(tokens), chunk_size - overlap):
        chunk = tokens[i:i + chunk_size]
        chunks.append(tokenizer.convert_tokens_to_string(chunk))
    return chunks

# Function to generate summaries for each file and calculate ROUGE score
def generate_summaries_and_evaluate():
    all_scores = {'rouge1': [], 'rouge2': [], 'rougeL': []}
    for file_name in tqdm(os.listdir(preprocessed_folder), desc="Generating Summaries and Evaluating"):
        if file_name.endswith(".txt"):
            # Load the preprocessed text file
            with open(os.path.join(preprocessed_folder, file_name), "r", encoding="utf-8") as f:
                input_text = f.read().strip()

            # Chunk the input text if it exceeds the model's max input size
            input_chunks = chunk_text(input_text)

            # Generate summary for each chunk and concatenate them
            generated_summary = ""
            for chunk in input_chunks:
                inputs = tokenizer(chunk, return_tensors="pt", max_length=512, padding=True, truncation=True).to("cuda" if torch.cuda.is_available() else "cpu")
                summary_ids = model.generate(input_ids=inputs['input_ids'], attention_mask=inputs['attention_mask'], num_beams=4, max_length=150, early_stopping=True)

                # Decode the summary and append it
                summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
                generated_summary += summary + " "

            # Save the generated summary into the GeneratedSummaries folder
            with open(os.path.join(generated_folder, file_name), "w", encoding="utf-8") as f:
                f.write(generated_summary.strip())

            # Load the reference summary
            with open(os.path.join(reference_folder, file_name), "r", encoding="utf-8") as f:
                reference_summary = f.read().strip()

            # Calculate ROUGE score between generated summary and reference summary
            scores = scorer.score(reference_summary, generated_summary.strip())
            all_scores['rouge1'].append(scores['rouge1'].fmeasure)
            all_scores['rouge2'].append(scores['rouge2'].fmeasure)
            all_scores['rougeL'].append(scores['rougeL'].fmeasure)

            # Display the ROUGE scores for each file
            print(f"File: {file_name}")
            print(f"ROUGE-1: {scores['rouge1'].fmeasure:.4f}")
            print(f"ROUGE-2: {scores['rouge2'].fmeasure:.4f}")
            print(f"ROUGE-L: {scores['rougeL'].fmeasure:.4f}")
            print("-" * 50)

    # Display the overall average scores
    avg_rouge1 = sum(all_scores['rouge1']) / len(all_scores['rouge1'])
    avg_rouge2 = sum(all_scores['rouge2']) / len(all_scores['rouge2'])
    avg_rougeL = sum(all_scores['rougeL']) / len(all_scores['rougeL'])

    print("Overall Average Scores:")
    print(f"ROUGE-1: {avg_rouge1:.4f}")
    print(f"ROUGE-2: {avg_rouge2:.4f}")
    print(f"ROUGE-L: {avg_rougeL:.4f}")

# Generate the summaries and evaluate them
generate_summaries_and_evaluate()
print("Summaries generated and evaluated.")
