In [1]:
import os
import torch
import numpy as np

from trl import SFTTrainer
from datasets import Dataset
from peft import LoraConfig
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM,TrainingArguments

# 1. Import config

In [2]:
import yaml

with open("config.yaml", 'r') as stream:
    try:
        config = yaml.safe_load(stream)
    except yaml.YAMLError as exc:
        print(exc)

# 2. Import data

In [3]:
from utils import import_data_from_json

device = torch.device('mps')

airbus_datapath = os.path.join("./data/", "airbus_helicopters_train_set.json")
train_dataset, val_dataset, test_dataset = import_data_from_json(airbus_datapath)

# model_name = "google-t5/t5-small"
# model_name = "facebook/bart-small"
# model_name = "HuggingFaceH4/zephyr-7b-beta"
model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(device)

tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)

#setting padding instructions for tokenizer
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"

# 3. Fine tuning 

In [4]:
# Create the trainer
from utils import prompt_instruction_format

trainingArgs = TrainingArguments(**config['parameters_ft'])

peft_config = LoraConfig(**config['parameters_LoRA'])

trainer = SFTTrainer(
    model=model,
    train_dataset=train_dataset,
    eval_dataset = val_dataset,
    peft_config=peft_config,
    tokenizer=tokenizer,
    packing=True,
    formatting_func=prompt_instruction_format,
    args=trainingArgs,
    max_seq_length=512
)

trainer.train()

'NoneType' object has no attribute 'cadam32bit_grad_fp32'


  warn("The installed version of bitsandbytes was compiled without GPU support. "


Generating train split: 0 examples [00:00, ? examples/s]

Generating train split: 0 examples [00:00, ? examples/s]

dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


  0%|          | 0/250 [00:00<?, ?it/s]

Checkpoint destination directory output_ft/checkpoint-25 already exists and is non-empty. Saving will proceed but saved results may be invalid.
Checkpoint destination directory output_ft/checkpoint-50 already exists and is non-empty. Saving will proceed but saved results may be invalid.
Checkpoint destination directory output_ft/checkpoint-75 already exists and is non-empty. Saving will proceed but saved results may be invalid.
Checkpoint destination directory output_ft/checkpoint-100 already exists and is non-empty. Saving will proceed but saved results may be invalid.
Checkpoint destination directory output_ft/checkpoint-125 already exists and is non-empty. Saving will proceed but saved results may be invalid.
Checkpoint destination directory output_ft/checkpoint-150 already exists and is non-empty. Saving will proceed but saved results may be invalid.
Checkpoint destination directory output_ft/checkpoint-175 already exists and is non-empty. Saving will proceed but saved results may 

{'train_runtime': 357.5158, 'train_samples_per_second': 2.797, 'train_steps_per_second': 0.699, 'train_loss': 0.5465741577148437, 'epoch': 10.0}


TrainOutput(global_step=250, training_loss=0.5465741577148437, metrics={'train_runtime': 357.5158, 'train_samples_per_second': 2.797, 'train_steps_per_second': 0.699, 'train_loss': 0.5465741577148437, 'epoch': 10.0})

# 4. Test 

In [8]:
def predict_from_model(model, legal_text, device): 

    model.eval()
    model.to(device)
    
    # prompt = f"""
    # Summarize the following legal text.

    # {legal_text}

    # Summary:
    # """

    prompt = f"""
    {legal_text}
    """

    inputs = tokenizer(prompt, return_tensors='pt')
    inputs = {key: value.to(device) for key, value in inputs.items()}

    with torch.no_grad(): 
        output_ids = model.generate(**inputs)
        output = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    
    return output, prompt

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

index = 1

legal_text = test_dataset['original_text'][index]
summary = test_dataset['reference_summary'][index]

output, prompt = predict_from_model(model, legal_text, device)

dash_line = '-'.join('' for _ in range(100))
print(dash_line)
print(f'INPUT PROMPT:\n{prompt}')
print(dash_line)
print(f'BASELINE HUMAN SUMMARY:\n{summary}\n')
print(dash_line)
print(f'MODEL GENERATION - ZERO SHOT:\n{output}')

---------------------------------------------------------------------------------------------------
INPUT PROMPT:

    Risks of loss or damage to the Equipment shall pass: From the Purchaser to the Supplier upon delivery as per Article 5, after the signature of the Reception Document; From the Supplier to the Purchaser upon return as per Article 6, after the signature of the Return Document.
    
---------------------------------------------------------------------------------------------------
BASELINE HUMAN SUMMARY:
Risk of loss or damage to the Equipment shall pass: From the Purchaser to the Supplier upon delivery, From the Supplier to the Purchaser upon return

---------------------------------------------------------------------------------------------------
MODEL GENERATION - ZERO SHOT:
Risks of loss or damage to the Equipment shall pass: From the Purchaser to the Supplier


# 5. Evaluation

In [None]:
from rouge_score import rouge_scorer
from sentence_transformers import SentenceTransformer, util

device = torch.device('mps')
semantic_model = SentenceTransformer('all-MiniLM-L6-v2')

def compute_rouge(test_dataset : Dataset, model) -> dict[str, dict[str, str]]:
    scores = []
    metrics = ['rouge1', 'rouge2', 'rougeL']
    scorer = rouge_scorer.RougeScorer(
        metrics,
        use_stemmer=True
    )
        
    for i in range(len(test_dataset)):
        example = test_dataset[i]  

        original_text = example['original_text']
        reference_summary = example['reference_summary']
        generated_summary, _ = predict_from_model(model, original_text, device)

        scores.append(
              scorer.score(
                    generated_summary,
                    reference_summary
              )
        )

    final_scores = {
        metric: {
            "precision": str(np.mean(
                [score.get(metric).precision for score in scores]
            )),
            "recall": str(np.mean(
                [score.get(metric).recall for score in scores]
            )),
            "fmeasure": str(np.mean(
                [score.get(metric).fmeasure for score in scores]
            )),
            }
        for metric in metrics
    }
    return final_scores

def compute_similarity_scores(test_dataset : Dataset, model) -> dict[str, dict[str, str]]:
    scores_with_reference = []
    scores_with_original_text = []

    for i in range(len(test_dataset)):
        example = test_dataset[i]  

        original_text = example['original_text']
        reference_summary = example['reference_summary']
        generated_summary, _ = predict_from_model(model, original_text, device)

        reference_embeddings = semantic_model.encode(
            reference_summary,
            convert_to_tensor=True
        )
        original_text_embeddings = semantic_model.encode(
            original_text,
            convert_to_tensor=True
        )
        generated_embeddings = semantic_model.encode(
            generated_summary,
            convert_to_tensor=True
        )

        scores_with_reference.append(
            util.cos_sim(
                reference_embeddings,
                generated_embeddings).cpu()
        )
        scores_with_original_text.append(
            util.cos_sim(
                original_text_embeddings,
                generated_embeddings).cpu()
        )
            
    final_scores = {
        "similarity_with_reference_summary": {
            "mean":   str(np.mean(scores_with_reference)),
            "median": str(np.median(scores_with_reference)),
            "std":    str(np.std(scores_with_reference))
        },
        "similarity_with_original_text": {
            "mean":   str(np.mean(scores_with_original_text)),
            "median": str(np.median(scores_with_original_text)),
            "std":    str(np.std(scores_with_original_text))
        }
    }
    return final_scores

def evaluate(test_dataset : Dataset, model):
    model.eval()
    return {
        "Rouge": compute_rouge(test_dataset, model),
        "Similarity": compute_similarity_scores(test_dataset, model)
    }

performance_metrics = evaluate(test_dataset, model)
print(performance_metrics)



{'Rouge': {'rouge1': {'precision': '0.33373947998609704', 'recall': '0.5656671798478522', 'fmeasure': '0.37287924055321575'}, 'rouge2': {'precision': '0.19779041111828605', 'recall': '0.32981260880420543', 'fmeasure': '0.2171983821513787'}, 'rougeL': {'precision': '0.28986354281565446', 'recall': '0.4957283496149042', 'fmeasure': '0.32417701327042575'}}, 'Similarity': {'similarity_with_reference_summary': {'mean': '0.62909263', 'median': '0.69479036', 'std': '0.25737685'}, 'similarity_with_original_text': {'mean': '0.6864568', 'median': '0.7865524', 'std': '0.24224243'}}}


In [None]:
def final_score(dict_performance : dict):
    score = 0

    performance_rouge = dict_performance['Rouge']
    performance_similarity = dict_performance['Similarity']

    for metric in performance_rouge.values():
        score += float(metric['precision'])

    for metric in performance_similarity.values(): 
        score += float(metric['mean'])

    return score / (len(performance_rouge)+len(performance_similarity))

score = final_score(performance_metrics)
print(score)

0.42738857278400744


# 6. Custom class 

In [None]:
from utils import compute_similarity_scores_text

class TextSummarizer(): 
    def __init__(self, config_file_path):
        with open(config_file_path, 'r') as file:
            try:    
                self.config = yaml.safe_load(file)
            except yaml.YAMLError as exc:
                print(exc)

        # With cuda
        # self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        # With MPS (Mac Silicon)
        self.device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
        
        ### Partie LLM Summarization
        self.n_brut_force = config['parameters_textsum_archi']['n_brut_force']
        models_params = self.config['models_params']
        self.models_no_RAG = self.create_models_out_of_RAG(models_params, self.device)

        ### Partie LLM + RAG
        datapath = self.config['dbRAG']['datapath']
        # Load RAG with LangChain

    @staticmethod
    def create_models_out_of_RAG(models_params : list[tuple], device): 
        list_models = []

        for tokenizer_name, model_path in models_params:

            model = AutoModelForSeq2SeqLM.from_pretrained(model_path).to(device)
            model.eval()

            tokenizer = AutoTokenizer.from_pretrained(tokenizer_name, trust_remote_code=True)
            tokenizer.pad_token = tokenizer.eos_token
            tokenizer.padding_side = "right"

            list_models.append((tokenizer, model))
        
        return list_models
    
    def predict_no_RAG(self, text_to_summarize):
        list_output = []

        for tokenizer, model in self.models_no_RAG:
            model.eval()
            model.to(self.device)

            prompt = f"""
            Summarize the following legal text.

            {text_to_summarize}

            Summary:
            """

            inputs = tokenizer(prompt, return_tensors='pt')
            inputs = {key: value.to(device) for key, value in inputs.items()}

            list_output_model = [] 
            for _ in range(self.n_brut_force): 
                with torch.no_grad(): 
                    output_ids = model.generate(**inputs)
                    output = tokenizer.decode(output_ids[0], skip_special_tokens=True)

                sim_score_output = compute_similarity_scores_text(text_to_summarize, output)
                
                list_output_model.append((sim_score_output, output))

            list_output.append(max(list_output_model, key=lambda x: x[0]))

        return max(list_output, key=lambda x: x[0])
    
    def predict_with_RAG(self, text_to_summarize): 
        sim_score_output, output = self.predict_no_RAG(text_to_summarize)

        if sim_score_output > 0.85: 
            return output
        
        else: 
            ### Partie LLM + RAG : Enrichissement du résumé
            pass



