In [2]:
output_file = 'fine_tuning_diseases.txt'
output_dir = "GPT-Neo-Disease-Symptoms"

In [4]:
from transformers import TextDataset, DataCollatorForLanguageModeling, GPT2Tokenizer, GPTNeoForCausalLM, Trainer, TrainingArguments

# Load the tokenizer
tokenizer = GPT2Tokenizer.from_pretrained("EleutherAI/gpt-neo-1.3B")
tokenizer.pad_token = tokenizer.eos_token

# Prepare the dataset
def load_dataset(file_path, tokenizer, block_size=128):
    return TextDataset(
        tokenizer=tokenizer,
        file_path=file_path,
        block_size=block_size,
    )

# Prepare the data collator
def load_data_collator(tokenizer, mlm=False):
    return DataCollatorForLanguageModeling(
        tokenizer=tokenizer,
        mlm=mlm,
    )

# Load the dataset
train_dataset = load_dataset(output_file, tokenizer)
data_collator = load_data_collator(tokenizer)

# Load the GPT-Neo model
model = GPTNeoForCausalLM.from_pretrained("EleutherAI/gpt-neo-125M")


  from .autonotebook import tqdm as notebook_tqdm


In [5]:
# Define training arguments

training_args = TrainingArguments(
    output_dir=output_dir,
    overwrite_output_dir=True,
    num_train_epochs=5, 
    per_device_train_batch_size=4,  
    save_steps=500,
    save_total_limit=2,
    logging_dir=f'{output_dir}/logs',
    logging_steps=100,
    learning_rate=5e-5,
)

# Initialize the trainer
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=train_dataset,
)

# Fine-tune the model
trainer.train()

# Save the model
trainer.save_model(output_dir)
tokenizer.save_pretrained(output_dir)


 14%|█▍        | 100/720 [01:13<07:26,  1.39it/s]

{'loss': 2.7697, 'grad_norm': 5.780911922454834, 'learning_rate': 4.305555555555556e-05, 'epoch': 0.69}


 28%|██▊       | 200/720 [02:27<06:23,  1.36it/s]

{'loss': 2.266, 'grad_norm': 5.402742862701416, 'learning_rate': 3.611111111111111e-05, 'epoch': 1.39}


 42%|████▏     | 300/720 [03:39<04:58,  1.41it/s]

{'loss': 1.97, 'grad_norm': 4.976509094238281, 'learning_rate': 2.916666666666667e-05, 'epoch': 2.08}


 56%|█████▌    | 400/720 [04:51<03:48,  1.40it/s]

{'loss': 1.6732, 'grad_norm': 5.329981803894043, 'learning_rate': 2.2222222222222223e-05, 'epoch': 2.78}


 69%|██████▉   | 500/720 [06:03<02:41,  1.37it/s]

{'loss': 1.5094, 'grad_norm': 4.8598127365112305, 'learning_rate': 1.527777777777778e-05, 'epoch': 3.47}


 83%|████████▎ | 600/720 [07:19<01:26,  1.38it/s]

{'loss': 1.3903, 'grad_norm': 5.066586494445801, 'learning_rate': 8.333333333333334e-06, 'epoch': 4.17}


 97%|█████████▋| 700/720 [08:31<00:14,  1.40it/s]

{'loss': 1.2923, 'grad_norm': 5.063773155212402, 'learning_rate': 1.388888888888889e-06, 'epoch': 4.86}


100%|██████████| 720/720 [08:48<00:00,  1.36it/s]


{'train_runtime': 528.4542, 'train_samples_per_second': 5.421, 'train_steps_per_second': 1.362, 'train_loss': 1.8212705612182618, 'epoch': 5.0}


('GPT-Neo-Disease-Symptoms/tokenizer_config.json',
 'GPT-Neo-Disease-Symptoms/special_tokens_map.json',
 'GPT-Neo-Disease-Symptoms/vocab.json',
 'GPT-Neo-Disease-Symptoms/merges.txt',
 'GPT-Neo-Disease-Symptoms/added_tokens.json')

In [6]:
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.docstore.document import Document
import pandas as pd


In [7]:
df = pd.read_csv("fine_tuning_diseases.txt", sep="\t")

In [8]:
# Convert the data to LangChain Documents
documents = [
    Document(page_content=row[0])
    for _, row in df.iterrows()
]

  Document(page_content=row[0])


In [9]:
# Generate embeddings for the documents
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-MiniLM-L6-v2")

# Create a FAISS index
knowledge_base = FAISS.from_documents(documents, embeddings)

  embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-MiniLM-L6-v2")


In [10]:
# Function to retrieve relevant context
def retrieve_context(symptoms, k=1):
    query = f"Symptoms: {symptoms}"
    results = knowledge_base.similarity_search(query, k=k)
    return " ".join([doc.page_content for doc in results])


In [11]:
from transformers import GPTNeoForCausalLM, GPT2Tokenizer
# Load the fine-tuned GPT-Neo model and tokenizer
model_path = "GPT-Neo-Disease-Symptoms" 
model = GPTNeoForCausalLM.from_pretrained(model_path)
tokenizer = GPT2Tokenizer.from_pretrained(model_path)
tokenizer.pad_token = tokenizer.eos_token

In [12]:
def identify_disease_with_rag(symptoms):
    # Retrieve relevant context
    context = retrieve_context(symptoms, k=1)
    
    # Create input text with retrieved context
    input_text = (
        "You are a medical assistant trained to predict diseases based on symptoms.\n\n"
        f"Context: {context}\n"
        f"Symptoms: {symptoms}\n\n"
        "Based on the context and symptoms, provide the name of the disease. "
        "If the context does not contain the answer, respond with: 'I don't know the answer.'"
    )
    
    # Tokenize input with attention mask
    inputs = tokenizer(
        input_text[:tokenizer.model_max_length],
        return_tensors="pt",
        padding=True,
        truncation=True
    )
    
    # Generate response
    outputs = model.generate(
        inputs['input_ids'],
        attention_mask=inputs['attention_mask'],  
        max_length=min(len(inputs['input_ids'][0]) + 50, tokenizer.model_max_length),
        top_k=50,
        pad_token_id=tokenizer.eos_token_id,
    )

    # Decode the generated output
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    
    #  Process the response to extract the disease name
    response_lines = response.strip().split("\n")
    for line in response_lines:
        if "this could be the disease:" in line:
            # Extract the disease name after the phrase
            disease_name = line.split("this could be the disease:")[-1].strip()
            if disease_name:
                # Remove any trailing period
                disease_name = disease_name.rstrip(".")
                return disease_name  # Return the extracted disease name

    # Default response if no valid disease name is found
    return "I don't know the answer."

In [13]:
# Example usage
user_symptoms = "I’m having trouble swallowing, and sometimes it feels like food is stuck in my throat. I’ve also lost some weight. What might be going on?"
diagnosis = identify_disease_with_rag(user_symptoms)
print(diagnosis)


Swallowing problems


In [14]:
# Example usage
user_symptoms = "fgerhb wih fiwehr iwuehr iewr"
diagnosis = identify_disease_with_rag(user_symptoms)
print(diagnosis)

Atrial fibrillation


In [15]:
import pandas as pd

# Load the test dataset
test_data_path = "symptom_diseases_test.csv"
test_df = pd.read_csv(test_data_path)

# Function to evaluate the model
def evaluate_model(test_df):
    correct_predictions = 0
    total_predictions = len(test_df)
    mismatched_cases = []

    for _, row in test_df.iterrows():
        symptoms = row["symptoms"]
        actual_disease = row["disease"]
        actual_disease = actual_disease.rstrip(".")
        predicted_disease = identify_disease_with_rag(symptoms)
        
        if predicted_disease.lower() == actual_disease.lower():
            correct_predictions += 1
        else:
            mismatched_cases.append(
                {
                    "Symptoms": symptoms,
                    "Actual Disease": actual_disease,
                    "Predicted Disease": predicted_disease,
                }
            )
    
    accuracy = correct_predictions / total_predictions
    return accuracy, mismatched_cases

# Run the evaluation
accuracy, mismatched_cases = evaluate_model(test_df)

# Display the results
print(f"Accuracy: {accuracy * 100:.2f}%")
if mismatched_cases:
    print("\nMismatched cases:")
    for case in mismatched_cases:
        print(f"Symptoms: {case['Symptoms']}")
        print(f"Actual Disease: {case['Actual Disease']}")
        print(f"Predicted Disease: {case['Predicted Disease']}")
        print("-" * 50)


Accuracy: 78.26%

Mismatched cases:
Symptoms: I have deep, constant pain in my belly and back, and I feel a pulse near my bellybutton. What could this be?
Actual Disease: Abdominal aortic aneurysm
Predicted Disease: AAA
--------------------------------------------------
Symptoms: I’m having trouble swallowing, and sometimes it feels like food is stuck in my throat. I’ve also lost some weight. What might be going on?
Actual Disease: Achalasia
Predicted Disease: Swallowing problems
--------------------------------------------------
Symptoms: I feel severe pain in my upper right belly that spreads to my shoulder, and I’ve been feeling nauseous and feverish. What could this mean?
Actual Disease: Acute cholecystitis
Predicted Disease: Pancreatitis (acute)
--------------------------------------------------
Symptoms: I’ve been losing hearing on one side, and there’s ringing in my ear. Sometimes I feel dizzy and off balance. What might this be?
Actual Disease: Acoustic neuroma (vestibular schw

In [16]:
import pandas as pd
import numpy as np
from sklearn.model_selection import GridSearchCV
from transformers import GPTNeoForCausalLM, GPT2Tokenizer
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.docstore.document import Document

# Hyperparameter Tuning Class
class HyperparameterTuner:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
        
    def tune_generation_parameters(self, test_df):
        # Define hyperparameter grid
        param_grid = {
            'top_k': [10, 30, 50, 70],
            'temperature': [0.7, 0.9, 1.1],
            'max_length_offset': [20, 50, 100]
        }
        
        best_accuracy = 0
        best_params = {}
        
        for top_k in param_grid['top_k']:
            for temperature in param_grid['temperature']:
                for max_length_offset in param_grid['max_length_offset']:
                    accuracy = self._evaluate_params(
                        test_df, 
                        top_k=top_k, 
                        temperature=temperature, 
                        max_length_offset=max_length_offset
                    )
                    
                    if accuracy > best_accuracy:
                        best_accuracy = accuracy
                        best_params = {
                            'top_k': top_k,
                            'temperature': temperature,
                            'max_length_offset': max_length_offset
                        }
        
        print("Best Hyperparameters:")
        print(f"Top K: {best_params['top_k']}")
        print(f"Temperature: {best_params['temperature']}")
        print(f"Max Length Offset: {best_params['max_length_offset']}")
        print(f"Best Accuracy: {best_accuracy * 100:.2f}%")
        
        return best_params
    
    def _evaluate_params(self, test_df, top_k, temperature, max_length_offset):
        correct_predictions = 0
        total_predictions = len(test_df)
        
        for _, row in test_df.iterrows():
            symptoms = row["symptoms"]
            actual_disease = row["disease"].rstrip('.')  # Remove trailing period
            
            # Retrieve context
            context = retrieve_context(symptoms, k=1)
            
            # Create input text
            input_text = (
                "You are a medical assistant trained to predict diseases based on symptoms.\n\n"
                f"Context: {context}\n"
                f"Symptoms: {symptoms}\n\n"
                "Based on the context and symptoms, provide the name of the disease. "
                "If the context does not contain the answer, respond with: 'I don't know the answer.'"
            )
            
            # Tokenize input
            inputs = self.tokenizer(
                input_text[:self.tokenizer.model_max_length],
                return_tensors="pt",
                padding=True,
                truncation=True
            )
            
            # Generate response with tuned parameters
            outputs = self.model.generate(
                inputs['input_ids'],
                attention_mask=inputs['attention_mask'],  
                max_length=min(len(inputs['input_ids'][0]) + max_length_offset, self.tokenizer.model_max_length),
                top_k=top_k,
                temperature=temperature,
                pad_token_id=self.tokenizer.eos_token_id,
            )
            
            # Decode and process response
            response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            response_lines = response.strip().split("\n")
            
            predicted_disease = None
            for line in response_lines:
                if "this could be the disease:" in line:
                    predicted_disease = line.split("this could be the disease:")[-1].strip().rstrip('.')
                    break
            
            if predicted_disease and predicted_disease.lower() == actual_disease.lower():
                correct_predictions += 1
        
        return correct_predictions / total_predictions

# Load the test dataset
test_data_path = "symptom_diseases_test.csv"
test_df = pd.read_csv(test_data_path)

# Initialize model and tokenizer
hf_token = ""
model_path = "GPT-Neo-Disease-Symptoms" 
model = GPTNeoForCausalLM.from_pretrained(model_path)
tokenizer = GPT2Tokenizer.from_pretrained(model_path)
tokenizer.pad_token = tokenizer.eos_token

# Hyperparameter Tuning
tuner = HyperparameterTuner(model, tokenizer)
best_hyperparams = tuner.tune_generation_parameters(test_df)



Best Hyperparameters:
Top K: 10
Temperature: 0.7
Max Length Offset: 20
Best Accuracy: 78.26%
