In [None]:
from transformers import PegasusForConditionalGeneration, PegasusTokenizer
import tensorflow as tf
import numpy as np
import os
import re
from transformers import AutoTokenizer, TFBertModel, TFAutoModelForSeq2SeqLM
tokenizer = PegasusTokenizer.from_pretrained('huggingface/legal-pegasus')
model = PegasusForConditionalGeneration.from_pretrained('huggingface/legal-pegasus')


In [None]:
def tokenization(cleaned_text, cleaned_summary_text, tokenizer):
    original_text_tokened = tokenizer(cleaned_text, return_tensors="tf", truncation=True, padding=True)
    summary_tokened = tokenizer(cleaned_summary_text, return_tensors="tf", truncation=True, padding=True)
    return original_text_tokened, summary_tokened

def clean_text(text):
    text = re.sub(r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}', '[DATE]', text)
    text = re.sub(r'\b\d{4}\b', '[YEAR]', text)
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    text = re.sub(r'\bAIR\s\d{4}\sSC\s\d{3,4}\b', '[CITATION]', text)
    legal_dict = {
        'hereinabove': 'above',
        'hereinafter': 'below',
        'plaintiff': 'claimant',
        'defendant': 'respondent'
    }
    for term, replacement in legal_dict.items():
        text = text.replace(term, replacement)
    boilerplate_phrases = [
        'the learned counsel submitted that',
        'in light of the above discussion',
        'the facts of the case are as follows'
    ]
    for phrase in boilerplate_phrases:
        text = text.replace(phrase, '')
    return text

def clean_summary_text(text):
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def read(filepath):
    with open(filepath, 'r', encoding='utf-8') as file:
        text = file.read()
        return text

# Function to load and preprocess data using a data pipeline
def load_and_preprocess_data(file_path):
    judgment_text_path = file_path[0].numpy().decode('utf-8')
    summary_text_path = file_path[1].numpy().decode('utf-8')

    judgment_text = read(judgment_text_path)
    summary_text = read(summary_text_path)
    
    cleaned_judgment_text = clean_text(judgment_text)
    cleaned_summary_text = clean_summary_text(summary_text)
    
    # Tokenization
    original_text_tokened, summary_tokened = tokenization(cleaned_judgment_text, cleaned_summary_text, tokenizer)
    
    return original_text_tokened['input_ids'][0], original_text_tokened['attention_mask'][0], summary_tokened['input_ids'][0]

# Dataset directory paths
dataset_dir = "C:/Users/prasa/Downloads/7152317/dataset/dataset/IN-Abs"
train_judgement_dir = os.path.join(dataset_dir, 'train-data', 'judgement')
train_summary_dir = os.path.join(dataset_dir, 'train-data', 'summary')

# Prepare file paths for judgments and summaries
train_files = [(os.path.join(train_judgement_dir, file), os.path.join(train_summary_dir, file)) for file in os.listdir(train_judgement_dir)]

# Create a TensorFlow Dataset
train_dataset = tf.data.Dataset.from_tensor_slices(train_files)
train_dataset = train_dataset.map(lambda x: tf.py_function(load_and_preprocess_data, [x], [tf.int32, tf.int32, tf.int32]))
train_dataset = train_dataset.shuffle(buffer_size=len(train_files))
train_dataset = train_dataset.padded_batch(16, padded_shapes=([None], [None], [None]))
train_dataset = train_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
test_judgement_dir = os.path.join(dataset_dir, 'test-data', 'judgement')
test_summary_dir= os.path.join(dataset_dir, 'test-data', 'summary')
test_files = [(os.path.join(test_judgement_dir, file), os.path.join(test_summary_dir, file)) for file in os.listdir(test_judgement_dir)]
test_dataset = tf.data.Dataset.from_tensor_slices(test_files)
test_dataset = test_dataset.map(lambda x: tf.py_function(load_and_preprocess_data, [x], [tf.int32, tf.int32, tf.int32]))
test_dataset = test_dataset.shuffle(buffer_size=len(test_files))
test_dataset = test_dataset.padded_batch(16, padded_shapes=([None], [None], [None]))
test_dataset = test_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
def custom_loss(labels, y_pred):
    cross_entropy_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)(labels, y_pred)
    return cross_entropy_loss
optimizer = tf.keras.optimizers.Adam(learning_rate=5e-5)
train_loss = tf.keras.metrics.Mean(name='train_loss')

@tf.function
def train_step(input_ids, attention_mask, decoder_input_ids, labels):
    with tf.GradientTape() as tape:
        input_attention_mask = tf.cast(tf.not_equal(input_ids, tokenizer.pad_token_id), tf.int32)
        decoder_attention_mask = tf.cast(tf.not_equal(decoder_input_ids, tokenizer.pad_token_id), tf.int32)
        
        outputs = model(input_ids, attention_mask=input_attention_mask, decoder_input_ids=decoder_input_ids, training=True).logits
        loss = custom_loss(labels, outputs)
        
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    train_loss(loss)

# Training loop
epochs = 10
for epoch in range(epochs):
    for batch in train_dataset:
        input_ids, attention_mask, decoder_input_ids = batch
        labels = decoder_input_ids[:, 1:]
        decoder_input_ids = decoder_input_ids[:, :-1]
        
        train_step(input_ids, attention_mask, decoder_input_ids, labels)
    print(f'Epoch {epoch + 1}, Loss: {train_loss.result()}')


In [None]:
def generate_summary(input_text):
    input_ids = tokenizer(input_text, return_tensors="tf").input_ids
    summary_ids = model.generate(input_ids, max_length=150, num_beams=2, length_penalty=2.0, early_stopping=True)
    return tokenizer.decode(summary_ids[0], skip_special_tokens=True)


In [None]:
# metric = load_metric("rouge")

# def evaluate_model(model, test_dataset):
#     for batch in test_dataset:
#         input_ids, attention_mask, decoder_input_ids = batch
#         labels = decoder_input_ids[:, 1:]
#         outputs = model.generate(input_ids, attention_mask=attention_mask)
#         decoded_preds = tokenizer.batch_decode(outputs, skip_special_tokens=True)
#         decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
#         metric.add_batch(predictions=decoded_preds, references=decoded_labels)

#     final_score = metric.compute()
#     return final_score

# bart_scores = evaluate_model(bart_model, test_dataset)
# pegasus_scores = evaluate_model(pegasus_model, test_dataset)

# print(f"BART Scores: {bart_scores}")
# print(f"Pegasus Scores: {pegasus_scores}")
