<a href="https://colab.research.google.com/github/sAndreotti/MedicalMeadow/blob/medllama/medllama.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Import Libraries

In [1]:
!pip install datasets accelerate peft bitsandbytes transformers trl==0.12.0 plotly huggingface_hub
!pip install --upgrade smart_open
!pip install --upgrade gensim
!pip install ffmpeg-python
!pip install -U openai-whisper
!pip install scipy librosa unidecode inflect



In [2]:
from datasets import load_dataset
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from collections import Counter
from trl import SFTTrainer
import re
from gensim.models.word2vec import Word2Vec
import plotly.express as px
import random
from sklearn.manifold import TSNE
from torch.utils.data import Dataset
from torch.utils.data import random_split
from peft import prepare_model_for_kbit_training, LoraConfig
from huggingface_hub import login
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    BitsAndBytesConfig,
    TrainingArguments
)
from datasets import Dataset as HFDataset
from peft import AutoPeftModelForCausalLM
import whisper
from IPython.display import Audio

Import Libraries for audio part

In [3]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

Using device: cuda


## Investigate Dataset

In [4]:
ds = load_dataset("medalpaca/medical_meadow_medical_flashcards")
ds = ds['train']
ds

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Dataset({
    features: ['input', 'output', 'instruction'],
    num_rows: 33955
})

In [5]:
instructions = ds['instruction']

## Train and evaluate models

#### Create dataset

In [6]:
# class MedDataset(Dataset):
#   def __init__(self, instruction, input, output):
#     self.instruction = instruction
#     self.input = input
#     self.output = output

#   def __len__(self):
#     return len(self.instruction)

#   def __getitem__(self, idx):
#     sentence = "<s>[INST] "+self.instruction[idx]+". "+self.input[idx]+" [/INST] "+self.output[idx]+" </s>"
#     return sentence

In [7]:
class MedDataset(Dataset):
    def __init__(self, dataset, tokenizer):
        self.dataset = dataset
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        example = self.dataset[idx]
        messages = [
            {"role": "system", "content": example['instruction']},
            {"role": "user", "content": example['input']},
            {"role": "assistant", "content": example['output']}
        ]

        prompt = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        )

        tokens = self.tokenizer(
            prompt,
            padding="max_length",
            truncation=True,
            max_length=128,
            return_tensors="pt"
        )

        tokens['labels'] = tokens['input_ids'].clone()
        tokens['labels'][tokens['input_ids'] == self.tokenizer.pad_token_id] = -100

        return {
            "input_ids": tokens['input_ids'].squeeze(),
            "attention_mask": tokens['attention_mask'].squeeze(),
            "labels": tokens['labels'].squeeze()
        }

In [8]:
login(token="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")

base_model = "meta-llama/Llama-3.2-1B-Instruct"

tokenizer = AutoTokenizer.from_pretrained(base_model)
tokenizer.pad_token = tokenizer.eos_token

# tokenizer.padding_side = "right"

In [9]:
tokenized_dataset = MedDataset(ds, tokenizer)

In [10]:
train_dataset, val_dataset, test_dataset = random_split(tokenized_dataset, [0.8, 0.1, 0.1])
print(f"Train dataset dimension: {len(train_dataset)}")
print(f"Validation dataset dimension: {len(val_dataset)}")
print(f"Test dataset dimension: {len(test_dataset)}")

Train dataset dimension: 27165
Validation dataset dimension: 3395
Test dataset dimension: 3395


In [11]:
compute_dtype = getattr(torch, "float16")

quant_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=compute_dtype,
    bnb_4bit_use_double_quant=False,
    bnb_4bit_representation="nested"
)

model = AutoModelForCausalLM.from_pretrained(
    base_model,
    quantization_config=quant_config,
    device_map={"": 0},
    torch_dtype=torch.float32,
    trust_remote_code=True
)
model.config.use_cache = False
model.config.pretraining_tp = 1

model.gradient_checkpointing_enable()
model = prepare_model_for_kbit_training(model)

Unused kwargs: ['bnb_4bit_representation']. These kwargs are not used in <class 'transformers.utils.quantization_config.BitsAndBytesConfig'>.


In [12]:
# def convert_to_hf_dataset(med_dataset):
#     # Create lists to store all formatted text
#     formatted_texts = []

#     # Iterate through all items in the original dataset
#     for idx in range(len(med_dataset.instruction)):
#         # Get the formatted text directly using the dataset's __getitem__
#         formatted_text = med_dataset[idx]
#         formatted_texts.append(formatted_text)

#     # Create a dictionary with the required format
#     dataset_dict = {
#         'text': formatted_texts
#     }

#     # Convert to HuggingFace Dataset
#     hf_dataset = HFDataset.from_dict(dataset_dict)

#     return hf_dataset

# hf_dataset = convert_to_hf_dataset(garnachoDataset)

In [13]:
peft_params = LoraConfig(
    lora_alpha=32,
    lora_dropout=0.1,
    r=16,
    bias="none",
    task_type="CAUSAL_LM",
)

In [14]:
model.train()
training_params = TrainingArguments(
    output_dir="./results",
    num_train_epochs=1,
    per_device_train_batch_size=8,
    gradient_accumulation_steps=4,
    optim="paged_adamw_32bit",
    eval_strategy="steps",
    logging_steps=90,
    eval_steps = 90,
    learning_rate=2e-4,
    weight_decay=0.001,
    fp16=False,
    bf16=False,
    max_grad_norm=0.3,
    max_steps=-1,
    warmup_ratio=0.03,
    group_by_length=True,
    lr_scheduler_type="constant",
    report_to="tensorboard",
    gradient_checkpointing=True
)

In [15]:
# trainer = SFTTrainer(
#     model=model,
#     train_dataset=train_dataset,
#     eval_dataset=val_dataset,
#     peft_config=peft_params,
#     max_seq_length=256,
#     tokenizer=tokenizer,
#     args=training_params,
#     packing=False,
# )

# # Train the model
# trainer.train()

# # Save the model and tokenizer
# trainer.save_model("./fine-tuned-model")
# tokenizer.save_pretrained("./fine-tuned-model")

In [16]:
# trainer.model.save_pretrained("model-chatbot-medical-mew")
# trainer.tokenizer.save_pretrained("model-chatbot-medical-mew")

## Test Trained Model

### Load pre-trained model

In [17]:
trained_model = "/content/model"
question = "What does low Mobility suggest?"

model = AutoPeftModelForCausalLM.from_pretrained(
    trained_model, # change with folder where u have the files
    quantization_config=quant_config,
    device_map={"": 0},
    torch_dtype=torch.float32,
    trust_remote_code=True
)
tokenizer = AutoTokenizer.from_pretrained(trained_model)

In [18]:
messages = [{"role": "system", "content": instructions[0]},
    {"role": "user", "content": question}]

prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)

model_inputs = tokenizer(prompt, return_tensors='pt', padding=True, truncation=True).to(device)

outputs = model.generate(**model_inputs, max_new_tokens=128)

text = tokenizer.decode(outputs[0], skip_special_tokens=True)

print(f"Question: {question}")
print(text.split("assistant")[1])

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Question: What does low Mobility suggest?


Low mobility is a medical condition that is characterized by a lack of movement or ability to move.


### Compare with MedLlama

In [19]:
# tokenizerMED = AutoTokenizer.from_pretrained("OpenScienceReseach/Med-LLaMA-7b")
# modelMED = AutoModelForCausalLM.from_pretrained("OpenScienceReseach/Med-LLaMA-7b") funzia ma lento

tokenizerMED = AutoTokenizer.from_pretrained("medalpaca/medalpaca-7b")
modelMED = AutoModelForCausalLM.from_pretrained("medalpaca/medalpaca-7b")

You are using the default legacy behaviour of the <class 'transformers.models.llama.tokenization_llama.LlamaTokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565 - if you loaded a llama tokenizer from a GGUF file you can ignore this message
You are using the default legacy behaviour of the <class 'transformers.models.llama.tokenization_llama_fast.LlamaTokenizerFast'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggin

Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

In [20]:
# DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'mps'
#         if torch.backends.mps.is_available() else 'cpu')

# modelMED = modelMED.to(DEVICE)
# tokenizerMED = tokenizerMED.to(DEVICE)

In [21]:
!pip install nltk
import nltk
nltk.download('punkt')

from nltk.translate.bleu_score import sentence_bleu

def calculate_bleu(reference, candidate):
    """
    Comput BLUE score between the candiate (generate answer) and the reference answer
    """
    try:
      score = sentence_bleu(reference, candidate)
      return score
    except Exception as e:
      print(f"Error during BLUE computing: {e}")
      return None



[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [22]:
def get_medical_answer(question, model, tokenizer, max_length=512):
    """
    Get answer from MedAlpaca for a medical question

    Args:
        question (str): Medical question to ask
        model: The loaded MedAlpaca model
        tokenizer: The loaded tokenizer
        max_length (int): Maximum length of the generated response

    Returns:
        str: Model's response
    """
    # Format the prompt according to MedAlpaca's expected format
    prompt = f"Below is a medical question that needs to be answered.\n\nQuestion: {question}\n\nAnswer:"

    # Tokenize input
    print(model.device)
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    # Generate response
    with torch.no_grad():
        outputs = model.generate(
            inputs["input_ids"],
            max_length=max_length,
            num_return_sequences=1,
            temperature=0.7,
            top_p=0.9,
            pad_token_id=tokenizer.eos_token_id
        )

    # Decode and return the response
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)

    # Extract just the answer part (after "Answer:")
    answer = response.split("Answer:")[-1].strip()
    return answer

In [23]:
questions = ["What does low REM sleep latency and experiencing hallucinations/sleep paralysis suggest?",
             "What are some possible causes of low PTH and high calcium levels?",
             "What is the term used to describe a condition of low sodium levels and very high proteins or lipids?"]

dataset_answer = ["Low REM sleep latency and experiencing hallucinations/sleep paralysis suggests narcolepsy.",
                  "PTH-independent hypercalcemia, which can be caused by cancer, granulomatous disease, or vitamin D intoxication.",
                  "The term used to describe a condition of low sodium levels and very high proteins or lipids is pseudohyponatremia."]

llama_bleu = []
our_bleu = []

from transformers import pipeline
import time

for i, question in enumerate(questions):
    print(f"Question: {question}")
    start = time.time()
    # llama
    response = get_medical_answer(question, modelMED, tokenizerMED)
    end = time.time() - start
    print("time for generate response: ", end)
    print("Response MedLlama:", response)

    candidate = response.split()
    bleu_score = calculate_bleu(dataset_answer[i], candidate)

    if bleu_score is not None:
        print(f"BLEU score for Llama answer {i}: {bleu_score}")
        llama_bleu.append(bleu_score)


for i, question in enumerate(questions):
    # our
    messages = [{"role": "system", "content": instructions[0]},
    {"role": "user", "content": question}]
    prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    model_inputs = tokenizer(prompt, return_tensors='pt', padding=True, truncation=True).to("cuda")
    outputs = model.generate(**model_inputs, max_new_tokens=128)
    text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    our = text.split("assistant")[1]
    print("Our Response: ", our)

    candidate = our.split()
    bleu_score = calculate_bleu(dataset_answer[i], candidate)

    if bleu_score is not None:
        print(f"BLEU score for our answer {i}: {bleu_score}")
        our_bleu.append(bleu_score)

import numpy
print(f"Average BLEU score for our model: {np.median(our_bleu)}")
print(f"Average BLEU score for Llama model: {np.median(llama_bleu)}")



Question: What does low REM sleep latency and experiencing hallucinations/sleep paralysis suggest?
cpu


KeyboardInterrupt: 