In [1]:
import json 
with open("/kaggle/input/dataset-contractnli/train.json") as file:
    data = json.load(file)

with open("/kaggle/input/validation-contract/dev.json") as file:
    val_data = json.load(file)

with open("/kaggle/input/dataset-contractnli/test.json") as file:
    test_data = json.load(file)



In [2]:
# print(data)
data["documents"] = data["documents"]
print(len(data["documents"]))

423


In [3]:
import random
import torch
from torch.utils.data import Dataset

class TrainDataset(Dataset):
    def __init__(self, document_data, tokenizer, max_length=128):
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.data = self.process_data(document_data)

    def process_data(self, document_data):
        """Process the document data to create labeled pairs of text and hypothesis."""
        data = []
        for document in document_data["documents"]:
            # for annotation_set in document["annotation_sets"]:
            annotation_set = document["annotation_sets"][0]
            annotations = annotation_set["annotations"]
            for nda, annotation in annotations.items():
                if annotation["choice"] == "Contradiction":
                    spans_text, random_texts = self.get_spans_text(document, annotation)
                    hypothesis = document_data["labels"][nda]["hypothesis"].lower()
                    
                    data.extend([(text ,hypothesis, 1) for text in spans_text])
                    data.extend([(text ,hypothesis, 0) for text in random_texts])
                    # if annotation["choice"] == "Contradiction":
                    #     spans_text, random_texts = self.get_spans_text(document, annotation)
                    #     hypothesis = document_data["labels"][nda]["hypothesis"].lower()
                        
                    #     data.extend([(text, hypothesis, 0) for text in spans_text])
                        # # data.extend([(text, hypothesis, 0) for text in random_texts])
        return data

    def get_spans_text(self, document, annotation):
        """Retrieve both labeled and random spans for a given document."""
        spans_indices = annotation["spans"]
        document_spans = document["spans"]
        
        # Retrieve labeled spans
        spans = [document_spans[i] for i in spans_indices]
        spans_text = [document["text"][start:end].lower() for start, end in spans]
        
        # Retrieve random spans, ensuring no overlap with labeled spans
        random_indices = random.sample(range(len(document_spans)), 2)
        random_indices = [i for i in random_indices if i not in spans_indices]
        random_spans = [document_spans[i] for i in random_indices]
        random_texts = [document["text"][start:end].lower() for start, end in random_spans]

        return spans_text, random_texts

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text, hypothesis, label = self.data[idx]
        encoding = self.encode_example(text, hypothesis)
        return {"input_ids": encoding["input_ids"].squeeze(), 
                "attention_mask": encoding["attention_mask"].squeeze(), 
                "label": label}

    def encode_example(self, text, hypothesis):
        """Encode a text-hypothesis pair using the tokenizer."""
        return self.tokenizer(
            text, hypothesis, max_length=self.max_length, 
            padding="max_length", truncation=True, 
            return_tensors="pt", return_attention_mask=True
        )

def collate_fn(batch):
    """Custom collate function to combine batch samples into a batch tensor format."""
    input_ids = torch.stack([item["input_ids"] for item in batch])
    attention_mask = torch.stack([item["attention_mask"] for item in batch])
    labels = torch.tensor([item["label"] for item in batch])

    return {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "labels": labels
    }


In [4]:
import torch
from transformers import Trainer, TrainingArguments, EarlyStoppingCallback
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import DataLoader
from transformers import DataCollatorWithPadding

# from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
# model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-small")
# tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-small")

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize tokenizer and model with dropout rate set in the configuration
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2).to(device)

# Create training and validation datasets
train_dataset = TrainDataset(data, tokenizer, max_length=512)
val_dataset = TrainDataset(val_data, tokenizer, max_length=512)
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

# Define training arguments with regularization adjustments
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy="epoch",           # evaluate at the end of each epoch
    learning_rate=1e-5,                    # use a lower learning rate
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=4,
    weight_decay=0.01,                     # apply weight decay
    logging_dir='./logs',
    logging_steps=10,
    save_strategy="epoch",
    load_best_model_at_end=True,           # load the best model based on validation loss
)

# Add early stopping to the Trainer to stop if no improvement is seen
early_stopping = EarlyStoppingCallback(early_stopping_patience=2)

# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    data_collator=data_collator,
    callbacks=[early_stopping]            
)

# Train the model
trainer.train()


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
[34m[1mwandb[0m: Using wandb-core as the SDK backend. Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
[34m[1mwandb[0m: Paste an API key from your profile and hit enter, or press ctrl+c to quit:

  ········


[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.011113824444444415, max=1.0…

Epoch,Training Loss,Validation Loss
1,0.427,0.441098
2,0.3085,0.382412
3,0.2492,0.367412
4,0.2405,0.361799


TrainOutput(global_step=404, training_loss=0.32676095095011265, metrics={'train_runtime': 1959.4157, 'train_samples_per_second': 6.565, 'train_steps_per_second': 0.206, 'total_flos': 3384660616151040.0, 'train_loss': 0.32676095095011265, 'epoch': 4.0})

In [5]:
import os
import subprocess
from IPython.display import FileLink, display
torch.save(model.state_dict(), 'model_entailment.pth')

def download_file(path, download_file_name):
    os.chdir('/kaggle/working/')
    zip_name = f"/kaggle/working/{download_file_name}.zip"
    command = f"zip {zip_name} {path} -r"
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    if result.returncode != 0:
        print("Unable to run zip command!")
        print(result.stderr)
        return
    display(FileLink(f'{download_file_name}.zip'))

download_file('model_entailment.pth', 'out')

In [6]:
# from transformers import AutoTokenizer
# import torch
# # Path to the saved model file in the working directory
# model_path = '/kaggle/working/find_tuned_bert_on_contradiction/model.safetensors'  # Replace with the actual filename

# # Load the model
# model = torch.load(model_path)


In [7]:
import torch
from sklearn.metrics import average_precision_score
from transformers import Trainer
import numpy as np

test_dataset = TrainDataset(test_data, tokenizer, max_length=512)
test_results = trainer.predict(test_dataset)

# Retrieve the predicted probabilities (logits) and true labels
pred_logits = test_results.predictions
true_labels = test_results.label_ids


# Sigmoid to get probabilities since we have a binary classification for each span
pred_probs = torch.sigmoid(torch.tensor(pred_logits)).numpy()



In [8]:
import sklearn
from sklearn import metrics

def precision_at_recall(y_true, y_prob, recall: float):
    assert 0. <= recall <= 1.0
    if len(y_true) == 0 or np.sum(y_true) == 0:
        return np.nan
    threshs = np.sort(np.unique(y_prob))[::-1]
    print(threshs.shape)
    # (len(np.unique(y_prob)), len(y_prob)) where first axis show prediction at different thresh
    y_preds = y_prob[None, :] >= threshs[:, None]
    recalls = np.logical_and(y_true[None, :], y_preds).sum(axis=1) / np.sum(y_true)
    assert np.all(recalls == np.sort(recalls))

    thresh = threshs[np.where(recalls >= recall)[0][0]]
    y_pred = y_prob >= thresh
    return sklearn.metrics.precision_score(y_true, y_pred, zero_division=0.)

print(true_labels.shape, pred_probs.shape)
print(precision_at_recall(true_labels,pred_probs[:,1],0.8))

(852,) (852, 2)
(842,)
0.9018567639257294


In [9]:


print(sklearn.metrics.accuracy_score(true_labels, np.argmax(pred_probs,axis=1)))

0.8708920187793427


In [10]:

import random
import torch
from torch.utils.data import Dataset

class TestDataset(Dataset):
    def __init__(self, document_data, tokenizer, max_length=128):
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.data = self.process_data(document_data)

    def process_data(self, document_data):
        """Process the document data to create labeled pairs of text and hypothesis."""
        data = []
        for document in document_data["documents"]:
            for annotation_set in document["annotation_sets"]:
                annotations = annotation_set["annotations"]
                for nda, annotation in annotations.items():
                    if annotation["choice"] == "Contradiction":
                        spans_text, random_texts = self.get_spans_text(document, annotation)
                        hypothesis = document_data["labels"][nda]["hypothesis"].lower()
                        
                        data.extend([(text, hypothesis, 1) for text in spans_text])
                        data.extend([(text, hypothesis, 0) for text in random_texts])
                   
        return data

    def get_spans_text(self, document, annotation):
        """Retrieve both labeled and random spans for a given document."""
        spans_indices = annotation["spans"]
        document_spans = document["spans"]
        
        # Retrieve labeled spans
        spans = [document_spans[i] for i in spans_indices]
        spans_text = [document["text"][start:end].lower() for start, end in spans]
        
        # Retrieve random spans, ensuring no overlap with labeled spans
        random_indices = random.sample(range(len(document_spans)), 2)
        random_indices = [i for i in random_indices if i not in spans_indices]
        random_spans = [document_spans[i] for i in random_indices]
        random_texts = [document["text"][start:end].lower() for start, end in random_spans]

        return spans_text, random_texts

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text, hypothesis, label = self.data[idx]
        encoding = self.encode_example(text, hypothesis)
        return {"input_ids": encoding["input_ids"].squeeze(), 
                "attention_mask": encoding["attention_mask"].squeeze(), 
                "label": label}

    def encode_example(self, text, hypothesis):
        """Encode a text-hypothesis pair using the tokenizer."""
        return self.tokenizer(
            text, hypothesis, max_length=self.max_length, 
            padding="max_length", truncation=True, 
            return_tensors="pt", return_attention_mask=True
        )

def collate_fn(batch):
    """Custom collate function to combine batch samples into a batch tensor format."""
    input_ids = torch.stack([item["input_ids"] for item in batch])
    attention_mask = torch.stack([item["attention_mask"] for item in batch])
    labels = torch.tensor([item["label"] for item in batch])

    return {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "labels": labels
    }




## Evaluation for Testing Dataset 

In [11]:
import torch
import torch.nn.functional as F

def predict_best_span(model, tokenizer, document, hypothesis, max_length=512, device="cuda"):
    """
    Predict the best evidence span for a given document and hypothesis.
    
    Args:
        model: The trained model (e.g., BERT or similar).
        tokenizer: The tokenizer for the model.
        document: The document containing spans.
        hypothesis: The hypothesis string.
        max_length: Maximum token length for inputs.
        device: Device to run the model on ("cuda" or "cpu").
    
    Returns:
        best_span: The span with the highest probability as evidence.
        best_prob: The probability of the best span.
    """
    model.eval()
    spans = document["spans"]  # List of spans (start, end) in the document
    text = document["text"]    # Full document text
    span_probs = []

    with torch.no_grad():
        for span in spans:
            start, end = span
            span_text = text[start:end].lower()

            # Encode the span and hypothesis
            inputs = tokenizer(
                span_text,
                hypothesis,
                max_length=max_length,
                padding="max_length",
                truncation=True,
                return_tensors="pt"
            ).to(device)

            # Predict probabilities
            outputs = model(**inputs)
            logits = outputs.logits  # Shape: [batch_size, num_labels]
            probs = F.softmax(logits, dim=-1)  # Convert logits to probabilities
            entailment_prob = probs[0][1].item()  # Probability of "Entailment" class
            span_probs.append(entailment_prob)

    # Select the span with the highest probability
    best_idx = torch.argmax(torch.tensor(span_probs)).item()
    best_span = spans[best_idx]
    best_prob = span_probs[best_idx]

    return best_span, best_prob


In [12]:
# Example inputs
document = {
    "text": "This is a sample contract document text.",
    "spans": [(0, 4), (5, 6)]  # Example spans
}
hypothesis = "This contract is valid."

# Predict the best span for the hypothesis
best_span, best_prob = predict_best_span(model, tokenizer, document, hypothesis)

print("Best Span:", best_span)
print("Probability of Best Span:", best_prob)


Best Span: (5, 6)
Probability of Best Span: 0.11060833185911179


In [2]:
import torch
import torch.nn.functional as F
from sklearn.metrics import average_precision_score

def calculate_map_p_at_r80(model, tokenizer, document_data, max_length=512, device="cuda"):
    """
    Calculate mAP and P@R80 for the test dataset.
    
    Args:
        model: Trained model for span predictions.
        tokenizer: Tokenizer for the model.
        document_data: Dataset containing documents, hypotheses, and ground truth spans.
        max_length: Max length for tokenization.
        device: Device for computation ("cuda" or "cpu").
    
    Returns:
        mAP: Mean Average Precision score.
        p_at_r80: Precision at Recall 80%.
    """
    model.eval()
    all_ap_scores = []
    precision_at_r80 = []

    with torch.no_grad():
        it=0
        for document in document_data["documents"]:
            print(it)
            it+=1
            annotation_set = document["annotation_sets"][0]
            annotations = annotation_set["annotations"]
            all_spans = document["spans"]
            text = document["text"]
            
            for nda, annotation in annotations.items():
                if annotation["choice"] == "Contradiction":
                    hypothesis = document_data["labels"][nda]["hypothesis"].lower()
                    true_spans = annotation["spans"]  # Ground truth span indices

                    # Predict probabilities for all spans
                    span_probs = []
                    for span in all_spans:
                        start, end = span
                        span_text = text[start:end].lower()
                        inputs = tokenizer(
                            span_text,
                            hypothesis,
                            max_length=max_length,
                            padding="max_length",
                            truncation=True,
                            return_tensors="pt"
                        ).to(device)
                        outputs = model(**inputs)
                        logits = outputs.logits
                        probs = F.softmax(logits, dim=-1)
                        entailment_prob = probs[0][1].item()  # Probability of "Entailment" class
                        span_probs.append(entailment_prob)

                    # Calculate Average Precision (AP) for this document-hypothesis pair
                    true_labels = [1 if i in true_spans else 0 for i in range(len(all_spans))]
                    ap_score = average_precision_score(true_labels, span_probs)
                    # print(true_labels, span_probs )
                    all_ap_scores.append(ap_score)

                    # Sort spans by probabilities
                    sorted_indices = sorted(range(len(span_probs)), key=lambda i: span_probs[i], reverse=True)
                    sorted_true_labels = [true_labels[i] for i in sorted_indices]

                    # Calculate Precision at Recall 80% (P@R80)
                    cumulative_true_positives = 0
                    recall_threshold = 0.8 * sum(true_labels)
                    for i, label in enumerate(sorted_true_labels):
                        cumulative_true_positives += label
                        if cumulative_true_positives >= recall_threshold:
                            precision = cumulative_true_positives / (i + 1)
                            precision_at_r80.append(precision)
                            break

    # Calculate final mAP and P@R80
    mAP = sum(all_ap_scores) / len(all_ap_scores) if all_ap_scores else 0
    p_at_r80 = sum(precision_at_r80) / len(precision_at_r80) if precision_at_r80 else 0

    return mAP, p_at_r80

# Example Usage
mAP, p_at_r80 = calculate_map_p_at_r80(model, tokenizer, test_data, max_length=512, device="cuda")
print(f"Mean Average Precision (mAP): {mAP}")
print(f"Precision at Recall 80% (P@R80): {p_at_r80}")


Mean Average Precision (mAP): 0.52398
Precision at Recall 80% (P@R80): 0.45712
