<a href="https://colab.research.google.com/github/rayhu/Awesome-Prompt-Engineering/blob/main/Ray_score_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Extracting the individual clauses and labels

In [None]:
!pip install transformers
!pip install datasets
!pip install torch
!pip install 'accelerate>=0.26.0'

In [None]:
import os
import json

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
import torch
from datasets import Dataset



In [None]:
# Data Loading Function
def load_clauses_data(data_dir):
    """
    Load clauses data from the specified directory
    Parameters:
        data_dir (str): Path to the directory containing service folders
    Returns:
        list: List of dictionaries containing clause data
    """
    all_clauses = []

    for service_folder in os.listdir(data_dir):
        service_path = os.path.join(data_dir, service_folder)

        if not os.path.isdir(service_path):
            continue

        clauses_file = os.path.join(service_path, 'clauses.json')
        if not os.path.exists(clauses_file):
            continue

        try:
            with open(clauses_file, 'r', encoding='utf-8') as f:
                data = json.load(f)

            if 'clauses' not in data or not data['clauses']:
                print(f"⚠️ WARNING: 'clauses' list is empty in '{service_folder}/clauses.json'")
                continue

            for clause in data['clauses']:
                if not all(key in clause for key in ['clause_text', 'description', 'rating']):
                    print(f"⚠️ WARNING: Skipping a clause in '{service_folder}' due to missing required fields")
                    continue

                clause_data = {
                    'service': service_folder,
                    'clause_text': clause['clause_text'],
                    'description': clause['description'],
                    'rating': clause['rating']
                }
                all_clauses.append(clause_data)

        except Exception as e:
            print(f"Error processing {service_folder}: {str(e)}")

    return all_clauses

In [None]:
rating_map = {
    'good': 0,
    'neutral': 1,
    'bad': 2,
    'blocker': 3,
}

In [None]:
DATA_DIR = "CS224-TC/data_all_202503120623106"
if not os.path.exists(DATA_DIR):
    print(f"Directory '{DATA_DIR}' not found. Cloning repository...")
    # Remove existing directory if it exists
    !rm -rf CS224-TC
    # Clone the repository
    !git clone --depth 1 --filter=blob:none https://github.com/AI-knows-your-rights/CS224-TC.git
else:
    print(f"Directory '{DATA_DIR}' already exists. Skipping cloning.")

# !git clone --depth 1 --filter=blob:none --no-checkout https://github.com/AI-knows-your-rights/CS224-TC.git
# Enable sparse-checkout
#!git sparse-checkout init --cone
# Specify the folder you want to checkout (e.g., "your_folder")
#!git sparse-checkout set data_all_202503120623106


In [None]:
# Load and prepare the dataset
clauses_data = load_clauses_data(DATA_DIR)
df = pd.DataFrame(clauses_data)

# Convert ratings to numerical values
df['rating'] = df['rating'].map(rating_map)
print("\nRating distribution after conversion:")
print(df['rating'].value_counts().sort_index())

# Split into training and testing sets

training_size = round(df.shape[0] * 0.8)

train_df, test_df = train_test_split(df, train_size=training_size, random_state=42)

print(f"Training set size: {len(train_df)}")
print(f"Testing set size: {len(test_df)}")
print("\nRating distribution in training set:")
print(train_df['rating'].value_counts().sort_index())

### BERT

In [None]:
model_name = "nlpaueb/legal-bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=4,
)


In [None]:
print("GPU Available:", torch.cuda.is_available())
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print("Using CPU")


In [None]:
# Move model to GPU if available
model = model.to(device)

In [None]:
# Define tokenization function

def tokenize_function(examples):
    # Convert the input to a list of strings and ensure it's properly formatted
    texts = [str(text) for text in examples["clause_text"]]  # Ensure text is string

    return tokenizer(
        texts,
        padding="max_length",
        truncation=True,
        max_length=512,
        return_tensors=None  # Important: keep this as None for batched processing
    )


In [None]:
def prepare_dataset(df):
    return Dataset.from_pandas(df)

train_raw_data = prepare_dataset(train_df)
test_raw_data = prepare_dataset(test_df)

In [None]:
print(train_raw_data)

print("Dataset features:", train_raw_data.features)
print("Sample row:", train_raw_data[0])


train_dataset = train_raw_data.map(tokenize_function, batched=True)
test_dataset = test_raw_data.map(tokenize_function, batched=True)

columns_to_remove = ['service', 'clause_text', 'description']
train_dataset = train_dataset.remove_columns(columns_to_remove)
test_dataset = test_dataset.remove_columns(columns_to_remove)
train_dataset = train_dataset.rename_column('rating', 'labels')
test_dataset = test_dataset.rename_column('rating', 'labels')

train_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
test_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])


In [None]:
# Define metrics for evaluation
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = predictions.squeeze()
    mse = ((predictions - labels) ** 2).mean()
    rmse = np.sqrt(mse)
    return {
        "mse": mse,
        "rmse": rmse
    }

#BertForSequenceClassification

In [None]:
# Define training arguments
training_args = TrainingArguments(
    output_dir="./results",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    push_to_hub=False,
    no_cuda=False,  # Enable GPU
    fp16=True,
)

In [None]:
# To track the training
!pip install weave
!wandb login


In [None]:
# Initialize trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
    # run_name="ray-legal-bert-regression",
)

In [None]:
print(train_dataset)

In [None]:
# Start training
trainer.train()

Epoch,Training Loss,Validation Loss


In [None]:
# Evaluate the model on test set
test_results = trainer.evaluate()
print("\nTest Results:")
print(test_results)

# Function to predict ratings for new clauses
def predict_rating(clause_text):

    inputs = tokenizer(
        clause_text,
        padding="max_length",
        truncation=True,
        max_length=512,
        return_tensors="pt"
    )

    inputs = {key: value.to(device) for key, value in inputs.items()}


    with torch.no_grad():
        outputs = model(**inputs)
        predictions = outputs.logits.squeeze()

    return predictions.item()

# Test prediction with a sample clause
sample_clause = test_df['clause_text'].iloc[0]
predicted_rating = predict_rating(sample_clause)
actual_rating = test_df['rating'].iloc[0]

print("\nSample Prediction:")
print(f"Predicted Rating: {predicted_rating:.2f}")
print(f"Actual Rating: {actual_rating}")

In [None]:
# Split the filtered data
clauses, ratings = zip(*filtered_clause_pairs)  # Extract clauses and their ratings

# Map ratings to integers
rating_dict = {"very bad": 0, "bad": 1, "neutral": 2, "good": 3}  # Modify if you have different ratings
ratings_int = [rating_dict[r] for r in ratings]

# Step 3.2: Split data into train, dev, and test sets (80% train, 10% dev, 10% test)
X_train, X_temp, y_train, y_temp = train_test_split(clauses, ratings_int, test_size=0.2, random_state=42)
X_dev, X_test, y_dev, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Step 3.3: Convert into a format that Hugging Face can use
train_data = Dataset.from_dict({"text": X_train, "label": y_train})
dev_data = Dataset.from_dict({"text": X_dev, "label": y_dev})
test_data = Dataset.from_dict({"text": X_test, "label": y_test})


In [None]:
# Step 4.1: Load the BERT tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

# Step 4.2: Define a function to tokenize the input texts
def tokenize_function(examples):
    return tokenizer(examples.get('text', ""), padding='max_length', truncation=True, max_length=512)

# Step 4.3: Apply the tokenizer to the train, dev, and test datasets
train_data = train_data.map(tokenize_function, batched=True)
dev_data = dev_data.map(tokenize_function, batched=True)
test_data = test_data.map(tokenize_function, batched=True)

# Step 4.4: Set the format for PyTorch
train_data.set_format(type='torch', columns=['input_ids', 'attention_mask', 'token_type_ids', 'label'])
dev_data.set_format(type='torch', columns=['input_ids', 'attention_mask', 'token_type_ids', 'label'])
test_data.set_format(type='torch', columns=['input_ids', 'attention_mask', 'token_type_ids', 'label'])

# Step 4.5: Remove the original text filed
train_data = train_data.map(tokenize_function, batched=True, remove_columns=["text"])
dev_data = dev_data.map(tokenize_function, batched=True, remove_columns=["text"])
test_data = test_data.map(tokenize_function, batched=True, remove_columns=["text"])

# Step 4.6: Make sure we are working with longs
train_data = train_data.map(lambda x: {"label": torch.tensor(x["label"]).long()})
dev_data = dev_data.map(lambda x: {"label": torch.tensor(x["label"]).long()})
test_data = test_data.map(lambda x: {"label": torch.tensor(x["label"]).long()})

In [None]:
# Step 5.1: Take a smaller sample (e.g., 5%) of the training data
train_sample = train_data.shuffle(seed=42).select(range(int(0.1 * len(train_data))))
dev_sample = dev_data.shuffle(seed=42).select(range(int(0.2 * len(dev_data))))

In [None]:
training_args = TrainingArguments(
    output_dir='./results',
    eval_strategy="epoch",  # Update to eval_strategy
    save_strategy="epoch",  # Save model at each epoch
    save_total_limit=2,  # Keep last 2 checkpoints
    learning_rate=5e-5,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    num_train_epochs=10,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
)

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)  # Convert logits to predicted labels
    return {"accuracy": accuracy_score(labels, preds)}

In [None]:
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=4)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_sample,
    eval_dataset=dev_sample,
    compute_metrics=compute_metrics  # Corrected function
)

# Train the model
trainer.train()

In [None]:
# Evaluate on the test set
test_results = trainer.evaluate(test_data)

# Print loss and accuracy
print(f"Test Loss: {test_results['eval_loss']:.4f}")
print(f"Test Accuracy: {test_results['eval_accuracy']:.4f}")  # Accuracy from compute_metrics