# Godel Model Fine-Tuning + Chatbot

This notebook is written to be run using a Kaggle notebook running a GPU T4 as an accelerator. 

It contains both the code for fine tuning Godel and for the chatbot loop, so that the model is already loaded for the chatbot. 

In [None]:
import os
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, Seq2SeqTrainer, Seq2SeqTrainingArguments, DataCollatorForSeq2Seq
from datasets import load_dataset, DatasetDict, Dataset
import json

## Data Preprocessing

In [None]:
import os
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, Seq2SeqTrainer, Seq2SeqTrainingArguments, DataCollatorForSeq2Seq
from datasets import load_dataset, DatasetDict, Dataset
import json

# Load and preprocess data
def load_doctor_data(path="/kaggle/input/mts-dialog-qa-dataset/question_answer_dataset.jsonl"):
    data = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            try:
                obj = json.loads(line)
                data_entry = json.loads(line)
                symptom = data_entry["finding"]
                emotion = data_entry["emote"]
                question = data_entry["doctor_q"]
                prev_answer = data_entry["prev_patient_a"]
                input_text = f"Symptom: {symptom}; Previous Patient Response: {prev_answer}"
                output_text = f"{question} ({emotion})"
                data.append({"input": str(input_text), "output": str(output_text)})
            except json.JSONDecodeError as e:
                print(f"Skipping invalid JSON line: {line.strip()}")
    return Dataset.from_list(data)


## Fine-Tune Model

In [None]:
def fine_tune_model(model_name="microsoft/GODEL-v1_1-base-seq2seq", output_dir="./godel_finetuned"):
    dataset = load_doctor_data()
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSeq2SeqLM.from_pretrained(model_name)

    def preprocess(example):
        inputs = tokenizer(example["input"], truncation=True, padding="max_length", max_length=128)
        targets = tokenizer(example["output"], truncation=True, padding="max_length", max_length=64)
        inputs["labels"] = targets["input_ids"]
        return inputs

    tokenized_dataset = dataset.map(preprocess, batched=True)
    training_args = Seq2SeqTrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=4,
        learning_rate=5e-5,
        num_train_epochs=3,
        save_steps=1000,
        save_total_limit=2,
        logging_dir='./logs',
        report_to="none"
    )

    trainer = Seq2SeqTrainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset,
        tokenizer=tokenizer,
        data_collator=DataCollatorForSeq2Seq(tokenizer, model=model)
    )

    trainer.train()
    model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)


fine_tune_model()

## Knowledge Base

This is the same code from MedicalKnowledgeBase.py, pasted for easy use on Kaggle

In [None]:
import json
import pandas as pd
import os
import random

class MedicalKnowledgeBase:
    def __init__(self, kb_json_file=None):
        self.df = pd.DataFrame({
            "diagnosis" : [],
            "finding" : [],
            "evoking_strength" : [], 
            "frequency" : []
        })
        self.diagnosis_list = []
        self.script_dir = os.path.dirname(os.path.abspath(__file__))
        if kb_json_file:
            with open(kb_json_file) as f: 
                data = json.load(f)
            for diagnosis in data:
                for finding in data[diagnosis]:
                    self.add_entry(diagnosis, finding, 1, data[diagnosis][finding])

    def get_kb(self):
        print(self.df.head())
    
    def add_entry(self, diagnosis, finding, evoking_strength, frequency):
        self.df.loc[len(self.df)] = [diagnosis, finding, evoking_strength, frequency]
        self.diagnosis_list = self.df['diagnosis'].unique()


    
    def get_diagnoses_for_findings(self, findings, neg_findings=[], *, match_req=0):
        #if no findings need to be excluded and diagnosis is not required to match findings, no need to loop
        if len(neg_findings) == 0 and match_req == 0:
            return self.diagnosis_list
        
        df = self.df
        invalid_diagnoses = df[df["finding"].isin(neg_findings)]['diagnosis'].tolist()

        if match_req == 0:
            possible_diagnoses = self.diagnosis_list
        else:
            match_df = df[df["finding"].isin(findings)]['diagnosis']
            counts = match_df.groupby("Employee_Name").size()
            possible_diagnoses = counts[counts >= match_req].index.to_list() 
              
        return set(possible_diagnoses).difference(invalid_diagnoses)
    

    def suggest_next_finding(self, current_findings, neg_findings=[], *, match_req=0):
        valid_diagnoses = self.get_diagnoses_for_findings(current_findings, neg_findings, match_req=match_req)
        df = self.df[self.df['diagnosis'].isin(valid_diagnoses)]

        # find common findings not yet observed
        candidate_scores = {}
        for _, row in df.iterrows():
            if (row['finding'] not in current_findings and row['finding'] not in neg_findings):
                score = row["evoking_strength"] * row["frequency"]
                candidate_scores[row['finding']] = candidate_scores.get(row['finding'], 0) + score

        # return the best finding
        if not candidate_scores:
            return None
        return max(candidate_scores, key=candidate_scores.get)


    def get_random_finding(self, current_findings=[], neg_findings=[]):
        finding = self.df.sample(n=1)['finding'].values[0]
        while finding in current_findings or finding in neg_findings:
            finding = self.df.sample(n=1)['finding'].values[0]
        return finding
    
    def get_random_findings(self, num_findings):
        return random.sample(sorted(self.df['finding'].unique()), num_findings)
    

    def save_kb_as_csv(self, filename="outputs/medical_kb.csv"):
        filename = os.path.join(self.script_dir, filename)
        self.df.to_csv(filename, index=False)

    def load_kb(self, filename="outputs/medical_kb.csv"):
        filename = os.path.join(self.script_dir, filename)
        self.df = pd.read_csv(filename)
        self.diagnosis_list = self.df['diagnosis'].unique()

kb = MedicalKnowledgeBase("mimic_4_kb_w_freq.json")

# Chatbot

In [None]:
# Load model
tokenizer = AutoTokenizer.from_pretrained("godel_finetuned")
model = AutoModelForSeq2SeqLM.from_pretrained("godel_finetuned")

In [None]:
# Chatbot loop
def generate_question(symptom, emotion="neutral"):
    prompt = f"Emotion: {emotion}; Symptom: {symptom}"
    inputs = tokenizer(prompt, return_tensors="pt")
    outputs = model.generate(**inputs, max_new_tokens=50)
    return tokenizer.decode(outputs[0], skip_special_tokens=True).split("(")[0].strip()

def chatbot_loop():
    print("Chatbot: Hello, I'm going to ask you some questions about your symptoms.")
    current_findings = []
    neg_findings = []
    next_finding = kb.get_random_finding()
    while len(current_findings) + len(neg_findings) < 10:

        # emotion = random.choice(["ne"])
        question = generate_question(next_finding)

        print(f"Chatbot: {question}")
        user_input = input("You: ")
        if any(word in user_input.lower() for word in ["yes", "yeah", "y", "i have", "sure"]):
            current_findings.append(next_finding)
        else:
            neg_findings.append(next_finding)

        # Suggest next finding
        next_finding = kb.suggest_next_finding(current_findings, neg_findings)

    print("Chatbot: Thank you for your time.")

chatbot_loop()