In [None]:
## Inference and test of legal Bert model

In [1]:

!pip install transformers pandas datasets torch scikit-learn
!pip install safetensors
# !pip install --upgrade transformers
!pip install evaluate
!pip install openai

Collecting datasets
  Downloading datasets-3.2.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.2.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m15.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m12.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl 

In [None]:
!pip install --upgrade safetensors transformers


Collecting safetensors
  Downloading safetensors-0.5.2-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.8 kB)
Collecting transformers
  Downloading transformers-4.48.0-py3-none-any.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Downloading safetensors-0.5.2-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (461 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m462.0/462.0 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading transformers-4.48.0-py3-none-any.whl (9.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.7/9.7 MB[0m [31m64.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: safetensors, transformers
  Attempting uninstall: safetensors
    Found existing installation: safetensors 0.4.5
    Uninstalling safetensors-0.4.5:
      Successfully uninstalled safetensors-0.4.5
  Attempting uninstall: tr

In [None]:
import pandas as pd
import os
import re
import torch
from transformers import BertForSequenceClassification, Trainer, TrainingArguments, AutoTokenizer
from datasets import Dataset
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer
from safetensors import SafetensorError  # Import SafetensorError
from openai import OpenAI


In [None]:
model_version_path = '/content/sample_data/model_files'
TEXT_CLASSES = ['Approval','Fill Form','Notify'] # Filter Test Labels


In [None]:
# format prompt, query and extra supporting documents
def primary_process_agent_format_prompt(agent_prompt, query, supporting_documents):
  n_words = len(query.split())
  if n_words < 4:
    agent_prompt = agent_prompt.format(supporting_documents,'few','words')
  else:
    agent_prompt = agent_prompt.format(supporting_documents,'at least five','words')
  return agent_prompt


In [None]:
# Collect and merge multiple csv,excel files into single dataframe
def combine_multiple_files(files_dir_path):
  combined_files = pd.DataFrame()
  for files in os.listdir(files_dir_path):
    if files.endswith('.tsv'):
      f_path = os.path.join(files_dir_path,files)
      df = pd.read_csv(f_path,sep='\t')
      combined_files = pd.concat([combined_files,df])
    elif files.endswith('.csv'):
      f_path = os.path.join(files_dir_path,files)
      df = pd.read_csv(f_path,sep=',')
      combined_files = pd.concat([combined_files,df])
    elif files.endswith('.xlsx'):
      f_path = os.path.join(files_dir_path,files)
      df = pd.read_excel(f_path)
      combined_files = pd.concat([combined_files,df])
  return combined_files


In [None]:
def merge_multiple_dataframe(dataframe_list,merge_by=0):
  df = pd.concat(dataframe_list, ignore_index=True)
  return df

In [None]:

def remove_varicon_pattern(text):
    # Regular expression to match '(varicon <number>)' at the end of the string
    cleaned_text = re.sub(r'\s*\(variation \d+\)\s*$', '', text)
    return cleaned_text

In [None]:
def clean_text(text):
    # Remove leading and trailing spaces from the text
    text = text.strip()

    # Ensure only one space exists between sentences
    text = re.sub(r'\s*\.\s*', '. ', text)  # Handle spaces around periods
    text = re.sub(r'\s+', ' ', text)  # Remove extra spaces in general

    # Ensure no space before the first sentence and after the last sentence
    return text.strip()

In [None]:

def text_preprocessing(texts_data_, col_map_dict=None, replace_col_values_dict=None, text_k='Text', label_k='labels',func_apply=None):

  if isinstance(texts_data_, pd.DataFrame):

    texts_data = texts_data_.copy()
    texts_data = texts_data.dropna().reset_index(drop=True)

    n_data_input = texts_data.shape[0]
    print('Number of Unprocessed datas: ',n_data_input)

    dropped_null_entry_rows = texts_data.shape[0]
    print('Null datas: ', n_data_input - dropped_null_entry_rows)

    n_dup_data = texts_data.duplicated().sum()
    print('Number of Duplicate datas: ',n_dup_data)

    if texts_data.shape[0] < 1:
      raise ValueError('No Data Found')

    if n_dup_data > 0:
      texts_data = texts_data.drop_duplicates().reset_index(drop=True)
      print('After Removing Duplicates: ',texts_data.shape[0])

    if col_map_dict is not None:
      texts_data.rename(columns=col_map_dict, inplace=True)


    # Check if the label column exists after renaming
    if label_k not in texts_data.columns:
      raise KeyError(f"Column '{label_k}' not found in DataFrame after renaming. Available columns: {texts_data.columns.tolist()}")

    if replace_col_values_dict is not None:
      texts_data[label_k] = texts_data[label_k].replace(replace_col_values_dict, regex=True)

    texts_data = texts_data[[text_k, label_k]]
    texts_data = texts_data.query(f'{label_k} in @TEXT_CLASSES')

    if func_apply is not None:
      texts_data[text_k] = texts_data[text_k].apply(func_apply)

    texts_data[text_k] = texts_data[text_k].apply(clean_text)

    texts_data.drop_duplicates(inplace=True)
    texts_data.reset_index(inplace=True, drop=True)
    print(texts_data.head(3))
    value_counts = texts_data[label_k].value_counts()
    print("\nFrequency of each unique value in Task Type: ")
    print(value_counts)
    return texts_data

  elif isinstance(texts_data, str):
    # texts_data = remove_varicon_pattern(texts_data)
    return texts_data


In [None]:
# Initialize Classifier Bert Model
def initialize_legal_bert_model(_model_path):

  files_ = !ls {_model_path}
  print('Files of Model: ',files_)
  computing_device = 'cuda' if torch.cuda.is_available() else 'cpu'
  try:
      model = AutoModelForSequenceClassification.from_pretrained(_model_path, local_files_only=True)
      tokenizer = AutoTokenizer.from_pretrained(_model_path, local_files_only=True)
  except SafetensorError:
    print('SafetensorError,  Initializing Manually ...')

    # Load weights manually if safetensor fails to load model
    state_dict = torch.load(f"{model_version_path}/pytorch_model.bin", weights_only=True, map_location=torch.device(computing_device))
    model = AutoModelForSequenceClassification.from_pretrained(None, state_dict=state_dict)
    tokenizer = AutoTokenizer.from_pretrained(model_version_path)

  except (OSError): # Catch potential file access errors or other Transformer-related errors
    print('Error loading model. Check file paths and integrity.')
    # If there's still an issue, raise the exception for debugging
    raise

  model = model.to(computing_device)
  print('Model Running on Device: ',computing_device)
  # create model classifier
  classifier = pipeline('text-classification',model=model,tokenizer=tokenizer)
  return classifier

In [None]:

# we can use pipeline for Inference
def run_custom_bert_model(text, classifier, class_mapper=None):

  if class_mapper is None:
      # Map model labels to meaningful names
      mapping_label = {
          'LABEL_2': 'Notify',
          'LABEL_0': 'Fill Form',
          'LABEL_1': 'Approval'
      }
  else:
      mapping_label = class_mapper

  # Get probabilities for all class labels
  results = classifier(text, top_k=None)

  # Map results to custom label names with scores
  map_results = {mapping_label[result['label']]: result['score'] for result in results}
  return map_results,(text, max(map_results, key=map_results.get), map_results[max(map_results, key=map_results.get)])


In [None]:
# Random text samples
def infer_model_test_sample(texts,text_key=None,text_label=None):
  if isinstance(texts, str):
    texts = texts
    _, results = run_custom_bert_model(texts, classifier, class_mapper=None)
    return results

  elif isinstance(texts,(list | tuple)):

    result = []
    for text in texts:
      res_dict = {}
      _, results = run_custom_bert_model(text, classifier, class_mapper=None)
      res_dict['labels'] = results[1]
      res_dict['prob'] = results[2]
      res_dict['text'] = text
      result.append(res_dict)
    return result

  elif isinstance(texts, pd.DataFrame):
    texts = texts.dropna().reset_index(drop=True)
    predicted_label = []
    original_label = []
    pred_prob = []
    org_text = []

    if text_key is None:
      text_key = 'Text'
    if text_label is None:
      text_label = 'labels'

    for data in  texts.iterrows():
      # Get probabilities for all class labels
      text = data[1][text_key]
      label = data[1][text_label]
      original_label.append(label)
      org_text.append(text)
      _, response = run_custom_bert_model(text, classifier, class_mapper=None)
      predicted_label.append(response[1])
      pred_prob.append(response[2])
    df = pd.DataFrame({'Text':org_text,'Original_Label':original_label,'Predicted_Label':predicted_label,'Probability_Score':pred_prob})
    return df



In [None]:
def prediction_analysis(report_, save_report_csv=False):
  if isinstance(report_, pd.DataFrame):
    total_test_cases = len(report_)
    correct_predictions = report_[report_['Original_Label'] == report_['Predicted_Label']].reset_index(drop=True)
    correct_predictions_count = len(correct_predictions)
    incorrect_predictions = report_[report_['Original_Label'] != report_['Predicted_Label']].reset_index(drop=True)
    incorrect_predictions_count = len(incorrect_predictions)
    accuracy = (correct_predictions_count / total_test_cases) * 100
    print(f"Total Test Cases: {total_test_cases}")
    print(f"Correct Predictions: {correct_predictions_count}")
    print(f"Incorrect Predictions: {incorrect_predictions_count}")
    print(f"Accuracy: {accuracy:.2f}%")
    if save_report_csv:
      correct_predictions.to_csv('correct_predicted.csv')
      incorrect_predictions.to_csv('incorrect_predicted.csv')
    return correct_predictions, incorrect_predictions
  else:
    raise ValueError("Invalid input type. Expected DataFrame, list[dict], or dict.")


In [None]:
def read_text_report(df):
  if isinstance(df, pd.DataFrame):
    for row in df.iterrows():
      print('*'*20)
      print(f"Text: {row[1]['Text']}")
      print(f"Original Label: {row[1]['Original_Label']}")
      print(f"Predicted Label: {row[1]['Predicted_Label']}")
      print('*'*20)

In [None]:
text_sample = (
    'start the project scoping process by filling out the required form with all the necessary details. Also need to capture the necessary details about the project in a form, ensuring everything aligns with the intended objectives and regulatory requirements. In the next step I need to  get final signoff from Project Manager with comments on all potential concerns are addressed. Eventually, all stakholders within this process will be sent letter with final updates.',
    'They need to get final signoff from Project Manager with remarks on all potential concerns addressed.',
    "Start the project scoping process by filling out the required form with all the necessary details. ",
    "Also need to capture the necessary details about the project in a form, ensuring everything aligns ",
    "with the intended objectives and regulatory requirements. In the next step I need to get final signoff ",
    "from the Project Manager with comments on all potential concerns being addressed. Eventually, all stakeholders ",
    "within this process will be sent a letter with final updates.",
    'Send Information',
    'Require customer profile data',
    'Inquiry user documents',
    'As part of the compliance process, I will submit a draft of the project scope to the project manager for review and approval, to confirm that all critical items align with the EU AI Act requirements.',
    "If approved, an email is sent to all participants.",
    "Manager to assess my document for further process",
    "Risk Manager approves  document."
)

In [None]:
# INITIALIZE MODEL

classifier = initialize_legal_bert_model(model_version_path)

Files of Model:  ['config.json\t   special_tokens_map.json  tokenizer.json', 'pytorch_model.bin  tokenizer_config.json    vocab.txt']


Device set to use cpu


Model Running on Device:  cpu


In [None]:
infer_model_test_sample(text_sample)

[{'labels': 'Fill Form',
  'prob': 0.9999583959579468,
  'text': 'start the project scoping process by filling out the required form with all the necessary details. Also need to capture the necessary details about the project in a form, ensuring everything aligns with the intended objectives and regulatory requirements. In the next step I need to  get final signoff from Project Manager with comments on all potential concerns are addressed. Eventually, all stakholders within this process will be sent letter with final updates.'},
 {'labels': 'Approval',
  'prob': 0.9998867511749268,
  'text': 'They need to get final signoff from Project Manager with remarks on all potential concerns addressed.'},
 {'labels': 'Fill Form',
  'prob': 0.9999651908874512,
  'text': 'Start the project scoping process by filling out the required form with all the necessary details. '},
 {'labels': 'Fill Form',
  'prob': 0.9998960494995117,
  'text': 'Also need to capture the necessary details about the project

## TEST CASE

In [None]:
query = 'need peer review information informed to all the coworkers'
infer_model_test_sample(query)

'''
('need peer review information informed to all the coworkers',
 'Approval',
 0.9682497382164001)

'''

('need peer review information informed to all the coworkers',
 'Approval',
 0.9682497382164001)