In [1]:
from nltk.tokenize import RegexpTokenizer
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import pandas as pd
import numpy as np
import re
from nltk.tokenize import  sent_tokenize
from transformers import Trainer, TrainingArguments
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoModel, BertForTokenClassification
from torch.utils.data import DataLoader, Dataset
import transformers
import torch
from torch.nn import CrossEntropyLoss
import evaluate
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from collections import Counter




In [2]:
class preprocess:
    def __init__(self):
        self.tokenizer = RegexpTokenizer(r"[a-zA-Z0-9]+|\.(?![a-zA-Z0-9])")
        self.window_size = 128
        self.max_token = 510
        
    def split_to_chunks_with_windows_with_attention_mask(self, tokens, labels=None):
        """
        Court cases has long text while BERT only has 512 maximum tokens. 
        This function splits the long text into chunks with 512 tokens with sliding windows.
        Add necessary padding if not equal to 512 tokens.
        Add attention mask.

            Args:
                tokens: A list of tokens.

            Returns:
                chunks: A list of list of string with 512 tokens including BERT's special token ([CLS] [SEP])
                chunk_labels: A list of lists, each containing labels corresponding to the tokens in chunks.
                attention_masks: A list of list of 1s and 0s that portrays what places are masked (0)
        """
        chunks = []
        chunk_labels = []
        attention_masks = []

        # Iterate over the list of tokens and chunking them with window slides
        for i in range(0, len(tokens), self.max_token - self.window_size):
            # store the current window of tokens and labels
            chunk_tokens = tokens[i:i + self.max_token]
            if labels:
                chunk_label = labels[i:i + self.max_token]

            # add special tokens
            chunk_tokens.insert(0, "[CLS]")
            chunk_tokens.append("[SEP]")

            if labels:
                # Add corresponding labels for special tokens
                chunk_label.insert(0, -100)  # Label for [CLS]
                chunk_label.append(-100)     # Label for [SEP]

            # Add padding and mask if less than 512 tokens
            if len(chunk_tokens) < self.max_token + 2: # +2 for [CLS] and [SEP]
                chunk_tokens, mask = self.pad_and_mask(chunk_tokens, maxlength=self.max_token + 2)
                if labels:
                    chunk_label.extend([-100] * (self.max_token + 2 - len(chunk_label)))  # Padding labels with -100 as well
            else: # just add the mask
                mask = np.ones(512, dtype=int).tolist()

            # append
            chunks.append(chunk_tokens)
            attention_masks.append(mask)
            if labels:
                chunk_labels.append(chunk_label)

            # Break loop if we have covered the entire sequence
            if i + self.max_token >= len(tokens):
                break

        # If used for finetuning
        if labels:
            return chunks, chunk_labels, attention_masks
        else:
            return (chunks, attention_masks)

    def pad_and_mask(self, chunk, maxlength=512):
        """
        Add [PAD] tokens to the chunk to make its length equal to maxlength (512 tokens).
        Add mask attention.
        
        Args:
            chunk: The list of tokens.
            maxlength: The target length after padding (default is 512).
        
        Returns:
            chunk: list of tokens with paddings.
            attention_mask: list of 1s and 0s for masking the paddings.
        """
        attention_mask = []
        
        # Calculate how many [PAD] tokens are needed
        pads_to_add = maxlength - len(chunk)

        # Add the attention mask
        attention_mask = [1] * len(chunk) + [0] * pads_to_add
    
        # Extend the chunk with [PAD] tokens
        chunk.extend(["[PAD]"] * pads_to_add)

        return chunk, attention_mask

    def change_char(self, text):
        """
        Removes special characters from a list of strings using regular expressions.
        As well as change characters.
        
        
          Args:
            strings: A list of strings.
        
          Returns:
            A new list of strings without special characters.
          """
        text = re.sub(r'[(),:;\'"’”[]]', '', text)
        text = re.sub(r'rtc', 'regional trial court', text)
        text = re.sub(r"\w*\d+\w*", "", text)
        text = re.sub(r"“", "", text)
        text = re.sub(r",”", "", text)
        text = re.sub(r",", "", text)
        text = re.sub(r",,.", "", text)
        text = re.sub(r",,.,", "", text)
        text = re.sub(r"--,", "", text)
        text = re.sub(r"\bno.\b", "number ", text)
        text = re.sub(r"\bg\b", "number ", text)
        text = re.sub(r"\br\b", "number ", text)
        text = re.sub(r"\u2033", "", text)
        text = re.sub(r"\u2032", "", text)
        return text

In [3]:
class preprocess_seqtoseq_data(preprocess):
    def __init__(self, file_path, bert_tokenizer):
        super().__init__()
        self.df = pd.read_csv(file_path)
        self.bert_tokenizer = bert_tokenizer
        self.court_cases = None
        self.rulings = None
        self.issues = None
        self.facts = None

        # Flag for new tokens found
        self.found_new_unknown_token = False

        # Load the unknown tokens
        try:
            with open('unknown_tokens.txt', 'r') as f:
                self.unknown_tokens = f.read().splitlines()
        except:
            self.unknown_tokens = []

        # use this variable for debugging
        self.debugging = True

        # drop null values in the comment
        self.df.dropna(inplace=True)
        
        # preprocess
        self.preprocess()

        # drop duplicates
        self.df = self.df.drop_duplicates()

    def preprocess(self):
        # lowercase the text and Remove unnecessary characters
        self.court_cases = [self.change_char(text.lower()) for text in self.df["whole_text"]]
        self.rulings = [self.change_char(text.lower()) for text in self.df["ruling"]]
        self.facts = [self.change_char(text.lower())for text in self.df["facts"]]
        self.issues = [self.change_char(text.lower()) for text in self.df["issues"]]

        # tokenize the text, storing words only
        self.court_cases = [self.tokenizer.tokenize(text) for text in self.court_cases]
        self.rulings = [self.tokenizer.tokenize(text) for text in self.rulings]
        self.facts = [self.tokenizer.tokenize(text) for text in self.facts]
        self.issues = [self.tokenizer.tokenize(text) for text in self.issues]
        
        # if longer than 512 tokens, chunk the tokens into 512 while adding windows and paddings & attention mask
        self.court_cases = [self.split_to_chunks_with_windows_with_attention_mask(tokens) for tokens in self.court_cases]
        self.rulings = [self.split_to_chunks_with_windows_with_attention_mask(tokens) for tokens in self.rulings]
        self.facts = [self.split_to_chunks_with_windows_with_attention_mask(tokens) for tokens in self.facts]
        self.issues = [self.split_to_chunks_with_windows_with_attention_mask(tokens) for tokens in self.issues]

    def prepare_input_output(self, chunks):
        """
        Prepare input-output pairs for each chunk. 
        Returns a list of tuples, where each tuple represents an (input, output) pair.
        """
        input_output_pairs = []
        for chunk in chunks:
            # Tokenize the chunk and convert to IDs
            input_ids = self.bert_tokenizer.convert_tokens_to_ids(chunk)
    
            # Verify that the chunk ends with the [SEP] token to avoid duplicates
            not_sep = input_ids[-1] != self.bert_tokenizer.convert_tokens_to_ids("[SEP]")
            not_pad = input_ids[-1] != self.bert_tokenizer.convert_tokens_to_ids("[PAD]")
            if not_sep and not_pad:
                input_ids.append(self.bert_tokenizer.convert_tokens_to_ids("[SEP]"))
    
            # Prepare the shifted output (excluding the initial [CLS] token)
            shifted_output = input_ids[1:]  # Shifted output starts from the second token
    
            # No need to append [SEP] here, it's already included in input_ids if required
            # Add input-output pair to list
            input_output_pairs.append((input_ids, shifted_output))

            # Check for unknown tokens and append them to the list that will be added
            unk_tokens = chunk
            for i in range(len(unk_tokens)):
                if input_ids[i] == 100 and unk_tokens[i] not in self.unknown_tokens:
                    print(unk_tokens[i]," : ",input_ids[i])
                    self.found_new_unknown_token = True
                    self.unknown_tokens.append(unk_tokens[i])

            '''if self.debugging == True:
                self.debugging = False
                print("output of the decoder: ",shifted_output)
                print("input to the decoder:",input_ids)'''
                        
            
        return input_output_pairs

    def prepare_court_case(self, chunks):
        """
        Convert court cases tokens into their respective IDs.

            Args:
                chunks: A list of tokens.
        
            Returns:
                "court_cases_ids", a list of of list of IDs (integers).
        """
        court_cases_ids = []
        for chunk in chunks:
            # Tokenize the chunk and convert to IDs
            input_ids = self.bert_tokenizer.convert_tokens_to_ids(chunk)
    
            # Verify that the chunk ends with the [SEP] token to avoid duplicates
            not_sep = input_ids[-1] != self.bert_tokenizer.convert_tokens_to_ids("[SEP]")
            not_pad = input_ids[-1] != self.bert_tokenizer.convert_tokens_to_ids("[PAD]")
            if not_sep and not_pad:
                input_ids.append(self.bert_tokenizer.convert_tokens_to_ids("[SEP]"))

            court_cases_ids.append(input_ids)
                
        return court_cases_ids

    def get_training_data(self):
        """
        Prepare training data for all segments, maintaining the structure per court case.
        """
        training_data = []
        self.found_new_unknown_token = False
        
        for i in range(len(self.court_cases)):
            # Prepare input-output pairs for each segment within a single court case
            court_case_data = self.prepare_court_case(self.court_cases[i][0])
            ruling_data = self.prepare_input_output(self.rulings[i][0])
            fact_data = self.prepare_input_output(self.facts[i][0])
            issue_data = self.prepare_input_output(self.issues[i][0])
            
            # Maintain structure by grouping segments within the same court case
            case_data = {
                "court_case": court_case_data,
                "rulings": ruling_data,
                "facts": fact_data,
                "issues": issue_data
            }

            '''print(type(ruling_data))
            print(type(ruling_data[0]))
            print(type(ruling_data[0][0]))'''
            
            training_data.append(case_data)

        # If unknown token/s found, Update file containing all unknown token & Raise an error message
        if self.unknown_tokens and self.found_new_unknown_token:
            with open('unknown_tokens.txt', 'w') as f:
                for token in self.unknown_tokens:
                    f.write(f"{token}\n")
            raise Exception("There are unknown token/s found. Update the tokenizer and finetune the model.")
        
        return training_data

In [4]:
class preprocess_finetuning_data(preprocess):
    def __init__(self, file_path, bert_tokenizer):
        super().__init__()
        self.df = pd.read_csv(file_path)
        self.bert_tokenizer = bert_tokenizer

        # Flag for new tokens found
        self.found_new_unknown_token = False

        # Load the unknown tokens
        try:
            with open('unknown_tokens.txt', 'r') as f:
                self.unknown_tokens = f.read().splitlines()
        except:
            self.unknown_tokens = []
    
        # For fine-tuning
        self.finetune_court = None
        self.finetune_ruling = None
        self.finetune_issues = None
        self.finetune_facts = None
        self.finetune_data = []

        # drop null values in the comment
        self.df.dropna(inplace=True)
        
        # preprocess and prepare proper format of data for finetuning
        self.preprocess()
        self.prepare_finetune_data()

        # drop duplicates
        self.df = self.df.drop_duplicates()
         
    def preprocess(self):
        # lowercase the text and Remove unnecessary characters
        self.finetune_court = [self.change_char(text.lower()) for text in self.df["whole_text"]]
        self.finetune_ruling = [self.change_char(text.lower()) for text in self.df["ruling"]]
        self.finetune_facts = [self.change_char(text.lower())for text in self.df["facts"]]
        self.finetune_issues = [self.change_char(text.lower()) for text in self.df["issues"]]

        # tokenize the text, accepting words, numbers, and dots only
        self.finetune_court = [self.tokenizer.tokenize(text) for text in self.finetune_court]
        self.finetune_ruling = [self.tokenizer.tokenize(text) for text in self.finetune_ruling]
        self.finetune_facts = [self.tokenizer.tokenize(text) for text in self.finetune_facts]
        self.finetune_issues = [self.tokenizer.tokenize(text) for text in self.finetune_issues]

        # convert to whole string
        self.finetune_court = [' '.join(token) for token in self.finetune_court]
        self.finetune_ruling = [' '.join(token) for token in self.finetune_ruling]
        self.finetune_facts = [' '.join(token) for token in self.finetune_facts]
        self.finetune_issues = [' '.join(token) for token in self.finetune_issues]

        # split into tokens (sentences)
        self.finetune_court = [sent_tokenize(text) for text in self.finetune_court]
        self.finetune_ruling = [sent_tokenize(text) for text in self.finetune_ruling]
        self.finetune_facts = [sent_tokenize(text) for text in self.finetune_facts]
        self.finetune_issues = [sent_tokenize(text) for text in self.finetune_issues]
            
    def prepare_finetune_data(self):
        """
        Prepare token classification data, labeling each token in the court case with
        its corresponding segment (rulings, facts, or issues).

        Variables:
            tokens_and_labels: list of tuples, wherein each tuples contains the list of 512 tokens and the list of labels of those 512 tokens.
            case_labels: list of integers, wherein each integer corresponds to a court case segment (i.e. rulings = 0).
            new_tokens: list of tokens, wherein each token is a sentence of a corresponding segment label.
            chunks:
            chunk_labels:
            attention_masks:
        """
        for i in range(len(self.finetune_court)):
            # Create context-based labels for the entire court case
            case_labels, new_tokens = self.prepare_contextual_labels(self.finetune_court[i], self.finetune_ruling[i], 
                                                         self.finetune_facts[i], self.finetune_issues[i])

            chunks, chunk_labels, attention_masks = self.split_to_chunks_with_windows_with_attention_mask(new_tokens, case_labels)

            for i in range(len(chunks)):
                self.finetune_data.append({
                    "input_ids": self.bert_tokenizer.convert_tokens_to_ids(chunks[i]),
                    "labels": chunk_labels[i],
                    "attention_mask": attention_masks[i]
                })

    def prepare_contextual_labels(self, case_tokens, ruling_tokens, fact_tokens, issue_tokens):
        """
        Given the tokenized court case, assign labels to each token based on context.

            Returns:
                labels: list of integers corresponding to their court casesegment label
                new_tokens: list of tokens, wherein each tokens is a sentence converted to word format.
        """
        labels = []
        new_tokens = []
        
        for text in case_tokens:
            token = self.tokenizer.tokenize(text)
            if text in fact_tokens:
                new_tokens.append(token)
                labels.extend([1] * len(token))  # Label 1 for facts
            elif text in ruling_tokens:
                new_tokens.append(token)
                labels.extend([0] * len(token)) # Label 0 for rulings
            elif text in issue_tokens:
                new_tokens.append(token)
                labels.extend([2] * len(token))  # Label 2 for issues

        # flatten list
        new_tokens = [token for sublist in new_tokens for token in sublist]
        return labels, new_tokens

    def get_finetune_data(self):
        return self.finetune_data

In [5]:
class CourtCaseDataset(Dataset):
    """
    Custom Dataset class for the court case data. Expects data in the form of input_ids, attention_mask, and labels.
    """
    def __init__(self, input_ids, attention_masks, labels):
        self.input_ids = input_ids
        self.attention_masks = attention_masks
        self.labels = labels

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return {
            'input_ids': self.input_ids[idx],
            'attention_mask': self.attention_masks[idx],
            'labels': self.labels[idx]
        }

In [6]:
class CustomTrainer(Trainer):
    def __init__(self, class_weights=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights

    def compute_loss(self, model, inputs):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        loss_fct = CrossEntropyLoss(weight=self.class_weights)
        loss = loss_fct(outputs.logits.view(-1, model.config.num_labels), labels.view(-1))
        return loss

In [7]:
class prep_model:
    def __init__(self, bert_tokenizer, bert_model):
        self.tokenizer = bert_tokenizer
        self.model = bert_model
        self.unknown_tokens = []
        self.metric = evaluate.load("accuracy")
        self.training_args = TrainingArguments(output_dir="test_trainer", 
                                               eval_strategy="epoch", 
                                               fp16=True, 
                                               learning_rate=3e-5,
                                              per_device_train_batch_size=4)
        self.trainer = None

        # Load the unknown tokens
        try:
            with open('unknown_tokens.txt', 'r') as f:
                self.unknown_tokens = f.read().splitlines()
        except FileNotFoundError:
            print("No 'unknown_tokens.txt' file found. Proceeding without new tokens.")

        # Update tokenizer with loaded tokens
        if self.unknown_tokens:
            self.update_tokenizer()

    def update_tokenizer(self):
        # Add the new tokens to the tokenizer
        self.tokenizer.add_tokens(self.unknown_tokens)

        # Resize the model's token embeddings to match the new tokenizer length
        self.model.resize_token_embeddings(len(self.tokenizer))

    def prepare_datasets(self, finetune_data):
        """
        Prepare training and evaluation datasets using train_test_split from sklearn.
        """
        input_ids = [entry["input_ids"] for entry in finetune_data]
        attention_masks = [entry["attention_mask"] for entry in finetune_data]
        labels = [entry["labels"] for entry in finetune_data]

        # Split the data into training and evaluation sets (80% train, 20% eval)
        train_input_ids, eval_input_ids, train_attention_masks, eval_attention_masks, train_labels, eval_labels = train_test_split(
            input_ids, attention_masks, labels, test_size=0.2, random_state=42
        )

        # Create custom PyTorch datasets
        train_dataset = CourtCaseDataset(train_input_ids, train_attention_masks, train_labels)
        eval_dataset = CourtCaseDataset(eval_input_ids, eval_attention_masks, eval_labels)

        return train_dataset, eval_dataset

    def finetune_model(self, finetune_data, flat_labels, device):
        # Compute class weights
        self.class_weights = compute_class_weight('balanced', classes=np.array([0, 1, 2]), y=flat_labels)
        self.class_weights = torch.tensor(self.class_weights, dtype=torch.float).to(device)  # Move to GPU if using CUDA
    
        # Prepare datasets
        train_dataset, eval_dataset = self.prepare_datasets(finetune_data)
    
        # Define a custom trainer object
        self.trainer = CustomTrainer(
            model=self.model,
            args=self.training_args,
            train_dataset=train_dataset,  
            eval_dataset=eval_dataset,    
            compute_metrics=self.compute_metrics,
            class_weights=self.class_weights  # Pass the class weights
        )
        
        # Train Model
        self.trainer.train()
    
        # Save the fine-tuned model and tokenizer
        self.model.save_pretrained("fine-tuned-legal-bert-model")
        self.tokenizer.save_pretrained("fine-tuned-legal-bert-tokenizer")

    def compute_metrics(self, eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        
        # Flatten predictions and labels
        predictions = predictions.flatten()
        labels = labels.flatten()
        
        # Filter out the ignored index (-100)
        valid_indices = labels != -100
        valid_predictions = predictions[valid_indices]
        valid_labels = labels[valid_indices]

        # Compute accuracy using the valid predictions and labels
        return self.metric.compute(predictions=valid_predictions, references=valid_labels)

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [9]:
tokenizer = AutoTokenizer.from_pretrained("nlpaueb/legal-bert-base-uncased")
model = BertForTokenClassification.from_pretrained("nlpaueb/legal-bert-base-uncased", num_labels=3)

  return self.fget.__get__(instance, owner)()
Some weights of BertForTokenClassification were not initialized from the model checkpoint at nlpaueb/legal-bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [10]:
seqtoseq_preprocessor = preprocess_seqtoseq_data("new_court_cases.csv", tokenizer) #tokenizer still in discussion sa utak ko
seqtoseq_data = seqtoseq_preprocessor.get_training_data()

In [11]:
y = prep_model(tokenizer, model)

In [12]:
finetune_preprocessor = preprocess_finetuning_data("new_court_cases.csv", y.tokenizer)
finetune_data = finetune_preprocessor.get_finetune_data()

In [13]:
finetune_data = finetune_data[0:900]
len(finetune_data)

900

In [14]:
# check class distribution
labels = [entry['labels'] for entry in finetune_data]
flattened_labels = [label for sublist in labels for label in sublist if label != -100]
print(Counter(flattened_labels))

Counter({0: 338679, 1: 88435, 2: 20841})


In [15]:
y.finetune_model(finetune_data, flattened_labels, device)

Epoch,Training Loss,Validation Loss


TypeError: CustomTrainer.compute_loss() got an unexpected keyword argument 'return_outputs'

# inference

In [None]:
model = BertForTokenClassification.from_pretrained("fine-tuned-legal-bert-model")
tokenizer = AutoTokenizer.from_pretrained("fine-tuned-legal-bert-tokenizer")

model.eval()

input_data = finetune_data[1]
input_ids = torch.tensor([input_data['input_ids']])  # Wrap in list to make it a batch of size 1
attention_mask = torch.tensor([input_data['attention_mask']])

# Move data to the appropriate device (GPU/CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
input_ids = input_ids.to(device)
attention_mask = attention_mask.to(device)

# Perform inference
with torch.no_grad():
    outputs = model(input_ids=input_ids, attention_mask=attention_mask)

# Get the logits (predictions before applying softmax)
logits = outputs.logits

# Convert logits to predicted class labels
predictions = torch.argmax(logits, dim=-1).cpu().numpy()

# Get tokens corresponding to input IDs
tokens = tokenizer.convert_ids_to_tokens(input_ids[0].cpu().numpy())

# Display tokens and their corresponding labels
for token, label in zip(tokens, predictions[0]):  # predictions[0] since we have a batch of size 1
    print(f"{token}: {label}")