In [16]:
from google.colab import drive
drive.mount('/content/drive')

!pip install datasets

%cd /content/drive/Othercomputers/My MacBook Air/Thesis/Coding/Synth

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/Othercomputers/My MacBook Air/Thesis/Coding/Synth


In [17]:
# General imports
import os
import re
import json
import random
import string
import numpy as np
from pathlib import Path
from typing import List
import pickle

# PyTorch imports
import torch
from torch.nn.utils import prune
from torch.utils.data import DataLoader
from torch.nn import CrossEntropyLoss

# Hugging Face Transformers
from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
    Trainer,
    TrainingArguments,
    DataCollatorForTokenClassification,
    PreTrainedTokenizerBase
)

# Hugging Face Datasets
from datasets import Dataset

# Scikit-learn
from sklearn.model_selection import train_test_split
from sklearn.utils import resample
from sklearn.metrics import classification_report

In [18]:
with open('final_data.pkl', 'rb') as file:
    final_data = pickle.load(file)

with open('raw_data.pkl', 'rb') as file:
    raw_data = pickle.load(file)

with open('aligned_tokenized_data.pkl', 'rb') as file:
    aligned_tokenized_data = pickle.load(file)

# Split the data into training and evaluation sets
train_data, eval_data = train_test_split(final_data, test_size=0.2, random_state=42)

# Initialize a set to collect unique words from the data
new_tokens_set = set()

# Simple space-based tokenizer function for vocabulary extraction
def simple_tokenizer(text: str):
    return text.split()

# Iterate through each entry in the raw_data
for entry in raw_data:
    # Extract the instructions and split into words
    instruction_tokens = simple_tokenizer(entry['instructions'])
    new_tokens_set.update(instruction_tokens)  # Add tokens to the set

    # Extract medication details
    medications = entry['medications']

    # Iterate through each medication in the entry
    for medication_name, fields in medications.items():
        # Add the medication name itself
        new_tokens_set.add(medication_name)

        # Iterate through each field in the medication dictionary
        for field_name, field_value in fields.items():
            # Split the field value into tokens and add them
            field_tokens = simple_tokenizer(field_value)
            new_tokens_set.update(field_tokens)

# Convert the set to a sorted list to maintain a consistent order
new_tokens = sorted(list(new_tokens_set))

# Define label list
label_list = ["O", "B-DRUG", "I-DRUG", "B-STRENGTH", "I-STRENGTH",
              "B-FORM", "I-FORM", "B-DOSAGE", "I-DOSAGE",
              "B-FREQUENCY", "I-FREQUENCY", "B-ROUTE", "I-ROUTE",
              "B-REASON", "I-REASON"]


model = AutoModelForTokenClassification.from_pretrained("./split_finetuned_tiny_clinicalbert_model", num_labels=len(label_list))
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Custom Tokenizer
class SimpleTokenizer:
    def __init__(self, vocab):
        self.vocab = vocab
        self.token_to_id = {token: idx + 1 for idx, token in enumerate(vocab)}  # ID starts at 1
        self.unknown_token_id = len(self.token_to_id) + 1  # ID for unknown tokens

    def tokenize(self, text: str) -> List[str]:
        # Use the same splitting approach as simple_tokenizer
        return text.split()

    def convert_tokens_to_ids(self, tokens: List[str]) -> List[int]:
        # Handle unknown tokens consistently
        return [self.token_to_id.get(token, self.unknown_token_id) for token in tokens]

    def __call__(self, text: str):
        tokens = self.tokenize(text)
        input_ids = self.convert_tokens_to_ids(tokens)
        return {
            "input_ids": input_ids,
            "attention_mask": [1] * len(input_ids)  # Default attention mask for each token
        }

    def save_pretrained(self, save_directory):
        """Save the tokenizer vocabulary to a directory."""
        if not os.path.exists(save_directory):
            os.makedirs(save_directory)

        vocab_file = os.path.join(save_directory, 'vocab.json')
        with open(vocab_file, 'w') as f:
            json.dump(self.token_to_id, f)

        print(f"Tokenizer vocabulary saved to {vocab_file}")

# Initialize the tokenizer with the new vocabulary
tokenizer = SimpleTokenizer(vocab=new_tokens)

In [19]:
# Custom padding function
def custom_pad_and_truncate(batch, max_length=48):
    """Pad and truncate the input IDs, attention masks, and labels."""
    padded_batch = {
        "input_ids": [],
        "attention_mask": [],
        "labels": []
    }

    for item in batch:
        # Pad or truncate input_ids
        input_ids = item['input_ids']
        input_ids = input_ids[:max_length] + [0] * max(0, max_length - len(input_ids))  # Pad with 0
        padded_batch["input_ids"].append(input_ids)

        # Pad or truncate attention_mask
        attention_mask = item['attention_mask']
        attention_mask = attention_mask[:max_length] + [0] * max(0, max_length - len(attention_mask))  # Pad with 0
        padded_batch["attention_mask"].append(attention_mask)

        # Pad or truncate labels
        labels = item['labels']
        labels = labels[:max_length] + [-100] * max(0, max_length - len(labels))  # Pad with -100 for labels
        padded_batch["labels"].append(labels)

    # Convert lists to PyTorch tensors
    padded_batch["input_ids"] = torch.tensor(padded_batch["input_ids"], dtype=torch.long)
    padded_batch["attention_mask"] = torch.tensor(padded_batch["attention_mask"], dtype=torch.long)
    padded_batch["labels"] = torch.tensor(padded_batch["labels"], dtype=torch.long)

    return padded_batch

# Custom data collator
def custom_data_collator(batch):
    """Custom data collator for batching and padding."""
    return custom_pad_and_truncate(batch, max_length=48)

# Metrics calculation function remains the same
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    # Extract true predictions and labels, ignoring the padding (-100)
    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    # Flatten the lists of true predictions and labels
    true_predictions_flat = [item for sublist in true_predictions for item in sublist]
    true_labels_flat = [item for sublist in true_labels for item in sublist]

    # Set zero_division to 0 to handle undefined precision/recall
    report = classification_report(true_labels_flat, true_predictions_flat, output_dict=True, zero_division=0)
    return {
        "precision": report["macro avg"]["precision"],
        "recall": report["macro avg"]["recall"],
        "f1": report["macro avg"]["f1-score"],
        "accuracy": report["accuracy"],
    }

class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        # Ensure all inputs are on the correct device
        inputs = {k: v.to(device) for k, v in inputs.items()}

        # Debug print for the first training step (ensure minimal operations)
        if self.state.global_step == 0:  # Print only during the first step
            print("\n[DEBUG] Trainer Input:")
            for key, value in inputs.items():
                print(f"{key}: {value.shape}, dtype: {value.dtype}")

                # Use `detach` and `cpu` to safely access the data for printing without affecting training
                detached_value = value.detach().cpu()
                for i in range(min(3, detached_value.size(0))):  # Limit to the first 3 inputs
                    print(f"Sample {i + 1} of {key}: {detached_value[i][:10].tolist()}")  # Convert to list for clearer print

        # Forward pass through the model
        outputs = model(**inputs)
        logits = outputs.logits
        labels = inputs['labels']

        # Compute loss
        loss = self.label_smoother(outputs, labels) if self.label_smoother else outputs.loss

        return (loss, outputs) if return_outputs else loss

In [20]:
# Load your fine-tuned model
model = AutoModelForTokenClassification.from_pretrained("./split_finetuned_tiny_clinicalbert_model", num_labels=len(label_list))

### Step 1: Pruning ###
# Prune the linear layers of the fine-tuned model
for name, module in model.named_modules():
    if isinstance(module, torch.nn.Linear):
        prune.l1_unstructured(module, name='weight', amount=0.3)  # Prune 30% of weights
        prune.remove(module, 'weight')  # Permanently remove the pruned weights

# Save the pruned model
model.save_pretrained("./pruned_finetuned_tiny_clinicalbert_model")

# Load the pruned model for further training
model = AutoModelForTokenClassification.from_pretrained("./pruned_finetuned_tiny_clinicalbert_model", num_labels=len(label_list))

# Create DataLoaders for training and evaluation using the custom data collator
train_dataloader = DataLoader(train_data, batch_size=16, collate_fn=custom_data_collator)
eval_dataloader = DataLoader(eval_data, batch_size=16, collate_fn=custom_data_collator)

In [None]:
### Step 2: Fine-tuning the Pruned Model ###
# Re-fine-tune to recover accuracy after pruning (optional)
# Setup your training dataset and arguments here
training_args = TrainingArguments(
    output_dir="./retrained_pruned_model",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=3e-5,  
    per_device_train_batch_size=8,  
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    weight_decay=0.01,
    logging_dir='./logs',
    report_to='none'
)

trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=train_data,  
    eval_dataset=eval_data,
    compute_metrics=compute_metrics,
    data_collator=custom_data_collator,
    tokenizer=tokenizer,  
)

trainer.train()

# Save the re-fine-tuned pruned model
model.save_pretrained("./retrained_pruned_model")



[DEBUG] Trainer Input:
input_ids: torch.Size([8, 48]), dtype: torch.int64
Sample 1 of input_ids: [158, 1162, 88, 1635, 671, 1068, 897, 1, 667, 413]
Sample 2 of input_ids: [117, 730, 30, 531, 467, 1068, 897, 1, 466, 1575]
Sample 3 of input_ids: [131, 1162, 88, 1635, 671, 1068, 1198, 4, 1812, 667]
attention_mask: torch.Size([8, 48]), dtype: torch.int64
Sample 1 of attention_mask: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
Sample 2 of attention_mask: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
Sample 3 of attention_mask: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
labels: torch.Size([8, 48]), dtype: torch.int64
Sample 1 of labels: [0, 1, 3, 4, 5, 0, 7, 7, 11, 9]
Sample 2 of labels: [0, 1, 7, 0, 0, 0, 7, 7, 4, 0]
Sample 3 of labels: [0, 1, 3, 4, 5, 0, 7, 7, 0, 11]


Epoch,Training Loss,Validation Loss,Precision,Recall,F1,Accuracy
1,0.0155,0.01373,0.994014,0.997255,0.995615,0.996241
2,0.0114,0.012291,0.99508,0.997584,0.996319,0.996651
3,0.0097,0.013046,0.994999,0.997498,0.996237,0.996458
4,0.0072,0.012233,0.995852,0.997445,0.996644,0.996723
5,0.0072,0.01199,0.995882,0.997472,0.996673,0.996868


Tokenizer vocabulary saved to ./retrained_pruned_model/checkpoint-1503/vocab.json
Tokenizer vocabulary saved to ./retrained_pruned_model/checkpoint-3006/vocab.json
Tokenizer vocabulary saved to ./retrained_pruned_model/checkpoint-4509/vocab.json
Tokenizer vocabulary saved to ./retrained_pruned_model/checkpoint-6012/vocab.json
Tokenizer vocabulary saved to ./retrained_pruned_model/checkpoint-7515/vocab.json
Tokenizer vocabulary saved to ./retrained_pruned_model/checkpoint-7515/vocab.json


In [None]:
### Step 3: Quantization ###
# Load the re-fine-tuned pruned model
pruned_model = AutoModelForTokenClassification.from_pretrained("./retrained_pruned_model", num_labels=len(label_list))

# Apply dynamic quantization to the pruned model
quantized_model = torch.quantization.quantize_dynamic(
    pruned_model, 
    {torch.nn.Linear},  # Only quantizing linear layers
    dtype=torch.qint8  # Uses 8-bit integer quantization
)

# Save the quantized model's weights using `state_dict`
torch.save(quantized_model, "./quantized_pruned_model.pth")

quantized_model = torch.load("./quantized_pruned_model.pth")

# Move the model to the appropriate device and set it to evaluation mode
device = torch.device('cpu')
quantized_model.to(device)
quantized_model.eval()  # Set the model to evaluation mode for inference


  quantized_model = torch.load("./quantized_pruned_model.pth")


BertForTokenClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(28996, 312, padding_idx=0)
      (position_embeddings): Embedding(512, 312)
      (token_type_embeddings): Embedding(2, 312)
      (LayerNorm): LayerNorm((312,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-3): 4 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): DynamicQuantizedLinear(in_features=312, out_features=312, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
              (key): DynamicQuantizedLinear(in_features=312, out_features=312, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
              (value): DynamicQuantizedLinear(in_features=312, out_features=312, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
              (dropout): Dropout(p=0.1, inplace=False)
            )

In [32]:
# Function to predict and compare labels
def predict_and_evaluate_for_all(tokens, ground_truth_labels, model, tokenizer, label_list, max_length=48):
    # Tokenize input using your SimpleTokenizer
    inputs = tokenizer(' '.join(tokens))  # Join tokens into a single string for tokenization

    # Pad or truncate input IDs and attention masks
    input_ids = inputs['input_ids'][:max_length] + [0] * max(0, max_length - len(inputs['input_ids']))
    attention_mask = inputs['attention_mask'][:max_length] + [0] * max(0, max_length - len(inputs['attention_mask']))

    # Convert to tensors and move to device
    input_ids = torch.tensor(input_ids).unsqueeze(0).to(device)  # Add batch dimension and move to device
    attention_mask = torch.tensor(attention_mask).unsqueeze(0).to(device)

    # Get model predictions
    model.eval()  # Put the model in evaluation mode
    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)

    # Get the predicted labels
    predictions = torch.argmax(outputs.logits, dim=2).cpu().numpy()

    # Convert predicted label ids to actual label names
    predicted_labels = [label_list[p] for p in predictions[0]]

    return tokens, predicted_labels  # Return tokens and predicted labels

# Initialize accuracy counters
correct_preds = {label: 0 for label in label_list}
total_preds = {label: 0 for label in label_list}

# Read the JSON files and evaluate the predictions
json_folder = "test/labeled_records"
all_records = [f for f in os.listdir(json_folder) if f.endswith('.json')]

# Flag to control sample entry printing
printed_sample = False
record_count = 0

for record_file in all_records:
    # Print statement for every 20 records processed
    if record_count % 20 == 0:
        print(f"Processed 20 records")  # Print statement to show record processing

    with open(os.path.join(json_folder, record_file), 'r') as json_file:
        data = json.load(json_file)
        entries = data['entries']

        for entry in entries:
            tokens = entry['tokens']
            ground_truth_labels = entry['labels']

            # Get model predictions
            predicted_tokens, predicted_labels = predict_and_evaluate_for_all(
                tokens, ground_truth_labels, quantized_model, tokenizer, label_list
            )

            # Update accuracy counts
            for pred_label, true_label in zip(predicted_labels, ground_truth_labels):
                if true_label in label_list:  # Ensure it's a valid label to evaluate
                    total_preds[true_label] += 1
                    if pred_label == true_label:
                        correct_preds[true_label] += 1

            # Show sample entry only for the first record
            if not printed_sample:
                print(f"\n--- Sample Entry from Record 1 ---")
                print(f"Tokens: {tokens}")
                print(f"Ground Truth Labels: {ground_truth_labels}")
                print(f"Predicted Labels  : {predicted_labels}")
                printed_sample = True  # Set flag to True after printing the first sample

    record_count += 1  # Increment the record count after processing each file

# Calculate overall accuracy for each label
overall_accuracies = {label: (correct_preds[label] / total_preds[label]) if total_preds[label] > 0 else 0 for label in label_list}

# Display the overall accuracy for each field
print("\nOverall Accuracy per Field:")
for label, accuracy in overall_accuracies.items():
    print(f"{label:15}: {accuracy:.2f}")


Processed 20 records

--- Sample Entry from Record 1 ---
Tokens: ['1.', 'Fluticasone-Salmeterol', '250-50', 'mcg/Dose', 'Disk', 'with', 'Device', 'Sig:', 'One', '(1)', 'Disk', 'with', 'Device', 'Inhalation', '[**Hospital1', '**]', '(2', 'times', 'a', 'day).', 'Disp:*60', 'Disk', 'with', 'Device(s)*', 'Refills:*2*']
Ground Truth Labels: ['O', 'B-DRUG', 'B-STRENGTH', 'I-STRENGTH', 'B-FORM', 'O', 'B-FORM', 'O', 'B-DOSAGE', 'I-DOSAGE', 'B-FORM', 'O', 'I-FORM', 'B-ROUTE', 'O', 'O', 'B-DOSAGE', 'I-DOSAGE', 'I-DOSAGE', 'B-FREQUENCY', 'O', 'B-FORM', 'O', 'B-FORM', 'O']
Predicted Labels  : ['O', 'O', 'B-STRENGTH', 'I-STRENGTH', 'I-STRENGTH', 'O', 'O', 'O', 'B-DOSAGE', 'B-DOSAGE', 'O', 'O', 'O', 'B-ROUTE', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-DOSAGE', 'B-DOSAGE', 'O', 'O']
Processed 20 records
Processed 20 records
Processed 20 records
Processed 20 records

Overall Accuracy per Fiel

In [33]:
import os

# Replace './quantized_pruned_model.pth' with the actual path to your quantized model file
quantized_model_path = "./quantized_pruned_model.pth"

# Check if the file exists
if os.path.exists(quantized_model_path):
    # Get the size of the model file
    model_size = os.path.getsize(quantized_model_path)

    # Convert size to megabytes for easier readability
    model_size_mb = model_size / (1024 * 1024)

    print(f"Size of the model file: {model_size_mb:.2f} MB")
else:
    print(f"File not found: {quantized_model_path}")


Size of the model file: 39.59 MB
