In [None]:
%pip install chromadb
%pip install langchain-community 

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer
import transformers
import pandas as pd
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.schema import Document
import numpy as np
import torch
from langchain.llms import HuggingFacePipeline
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from typing import List, Dict, Any, Optional
from langchain.schema import BaseRetriever
import re


In [5]:
class MedicalDataProcessor:
    def __init__(self, csv_path):
        self.df = pd.read_csv(csv_path)
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=50,
            length_function=len,
            separators=["\n\n", "\n", ". ", " ", ""]
        )
        self.embeddings = HuggingFaceEmbeddings(
            model_name="dmis-lab/biobert-base-cased-v1.2",
            model_kwargs={'device': 'cuda' if torch.cuda.is_available() else 'cpu'}
        )
    
    def prepare_documents(self):
        documents = []
        for _, row in self.df.iterrows():
            content = f"Question: {row['Questions']}\nAnswer: {row['Answers']}"
            
            # Update metadata to match your CSV columns
            doc = Document(
                page_content=content,
                metadata={
                    'focus': row['Focus'],
                    'question': row['Questions'],
                    'source': 'MedQuAD'
                }
            )
            documents.append(doc)
        return documents
     
    def create_chunks(self, documents):
        """Split documents into chunks."""
        return self.text_splitter.split_documents(documents)
    
    def create_vectorstore(self, chunks, persist_directory="medical_vectorstore"):
        """Create and persist vector store."""
        vectorstore = Chroma.from_documents(
            documents=chunks,
            embedding=self.embeddings,
            persist_directory=persist_directory
        )
        vectorstore.persist()
        return vectorstore

def main():
    # Initialize processor
    processor = MedicalDataProcessor("/kaggle/input/medquad-processed/processed_medquad.csv")
    # Create documents with metadata
    print("Preparing documents...")
    documents = processor.prepare_documents()
    
    # Split into chunks
    print("Creating chunks...")
    chunks = processor.create_chunks(documents)
    
    # Create vector store
    print("Creating vector store...")
    vectorstore = processor.create_vectorstore(chunks)
    
    print(f"Processing complete. Total chunks created: {len(chunks)}")
    return vectorstore

if __name__ == "__main__":
    main()

  self.embeddings = HuggingFaceEmbeddings(


config.json:   0%|          | 0.00/1.11k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/436M [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/213k [00:00<?, ?B/s]

Preparing documents...
Creating chunks...
Creating vector store...
Processing complete. Total chunks created: 68377


  vectorstore.persist()


In [4]:
import os
import subprocess
from IPython.display import FileLink, display

def download_file(path, download_file_name):
    os.chdir('/kaggle/working/')
    zip_name = f"/kaggle/working/{download_file_name}.zip"
    command = f"zip {zip_name} {path} -r"
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    if result.returncode != 0:
        print("Unable to run zip command!")
        print(result.stderr)
        return
    display(FileLink(f'{download_file_name}.zip'))

In [6]:
download_file('/kaggle/working/medical_vectostore', 'out')

Unable to run zip command!



In [4]:
from pydantic import Field
from langchain.schema import BaseRetriever
from typing import List, Any

class SimilarQuestionRetriever(BaseRetriever):
    vectorstore: Any = Field(default=None, description="Vector store for document retrieval")
    
    def _get_relevant_documents(self, query: str) -> List[Document]:
        # Get the most similar document
        docs_and_scores = self.vectorstore.similarity_search_with_score(query, k=1)
        
        if not docs_and_scores:
            return []
        
        most_similar_doc, _ = docs_and_scores[0]
        original_question = most_similar_doc.metadata.get('question', '')
        
        # Get documents with the same question and their answers
        similar_docs = self.vectorstore.similarity_search(
            original_question,
            k=2,
            filter={"question": original_question}
        )
        
        return similar_docs

    async def _aget_relevant_documents(self, query: str) -> List[Document]:
        return self._get_relevant_documents(query)

In [5]:
class MedicalRAGSystem:
    def __init__(self, model_path="BioMistral/BioMistral-7B", persist_directory="medical_vectorstore"):
        self.setup_model(model_path)
        self.load_vectorstore(persist_directory)
        self.setup_prompt()
        self.setup_qa_chain()

    def setup_model(self, model_path):
        """Initialize BioMistral model and tokenizer."""
        print("Loading BioMistral model and tokenizer...")
        tokenizer = AutoTokenizer.from_pretrained(model_path)
        model = AutoModelForCausalLM.from_pretrained(
            model_path,
            torch_dtype=torch.float16,
            device_map="auto"
        )
        
        pipeline = transformers.pipeline(
            "text-generation",
            model=model,
            tokenizer=tokenizer,
            max_length=512,  # Back to max_length for better control
            temperature=0.1,  # Reduced for more focused responses
            do_sample=False,  # Deterministic output
            top_p=0.95,
            repetition_penalty=1.1  # Prevent repetition
        )
        
        self.llm = HuggingFacePipeline(pipeline=pipeline)
        print("Model setup complete!")

    def load_vectorstore(self, persist_directory):
        """Load existing vector store with similar question retrieval."""
        print("Loading vector store...")
        self.vectorstore = Chroma(
            persist_directory=persist_directory,
            embedding_function=HuggingFaceEmbeddings(
                model_name="dmis-lab/biobert-base-cased-v1.2",
                model_kwargs={'device': 'cuda' if torch.cuda.is_available() else 'cpu'}
            )
        )
        self.retriever = SimilarQuestionRetriever(vectorstore=self.vectorstore)
        print("Vector store loaded!")

    def setup_prompt(self):
        """Create enhanced prompt template for medical QA."""
        template = """[INST] You are a medical assistant answering health-related questions.
        Use only the following medical information to answer the question.
        If no relevant information is found in the context, respond with:
        "I apologize, but I don't have any information about [topic] in my knowledge base."
        
        Medical Knowledge:
        {context}
        
        Question: {question}
        
        Please provide a clear, accurate medical response using only the information from the Medical Knowledge section above. [/INST]"""
        
        self.prompt = PromptTemplate(
            template=template,
            input_variables=["context", "question"]
        )

    def setup_qa_chain(self):
        """Setup the question-answering chain."""
        print("Setting up QA chain...")
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.retriever,
            chain_type_kwargs={
                "prompt": self.prompt
            },
            return_source_documents=True
        )
        print("QA chain ready!")

    def get_answer(self, question: str):
        """Get answer for a medical question."""
        try:
            result = self.qa_chain({"query": question})
            return {
                "answer": result["result"],
                "sources": [
                    {
                        "focus": doc.metadata.get("focus", ""),
                        "question": doc.metadata.get("question", "")
                    } 
                    for doc in result["source_documents"]
                ]
            }
        except Exception as e:
            return {
                "answer": "I encountered an error processing your question. Please try again.",
                "error": str(e)
            }

In [6]:
rag_system = MedicalRAGSystem()

Loading BioMistral model and tokenizer...


Device set to use cuda:0
  self.llm = HuggingFacePipeline(pipeline=pipeline)
  embedding_function=HuggingFaceEmbeddings(


Model setup complete!
Loading vector store...


  self.vectorstore = Chroma(


Vector store loaded!
Setting up QA chain...
QA chain ready!


In [9]:
def test_medical_qa(question: str, rag_system):
    print("Question:", question)
    print("\nProcessing...")
    response = rag_system.get_answer(question)
    
    if "error" in response:
        print("\nError:", response["error"])
        return
        
    answer = response.get("answer", "")
    if "[INST]" in answer:
        answer = answer.split("[/INST]")[-1].strip()
    print("\nAnswer:", answer)
    
    if "sources" in response:
        print("\nSources:")
        for source in response["sources"]:
            print(f"\nFocus: {source['focus']}")
            print(f"Original Question: {source['question']}")

In [10]:
test_medical_qa("How to diagnose Schimke immunoosseous dysplasia ?",rag_system)

Question: How to diagnose Schimke immunoosseous dysplasia ?

Processing...


  result = self.qa_chain({"query": question})
Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.



Answer: Schimke immunodysplasia is a rare autosomal recessive disorder characterized by short stature, skeletal dysplasia, renal failure, T-cell deficiency, and thyroid dysfunction. Other features include shallow acetabular roofs, spondyloepiphyseal dysplasia, thoracic kyphosis, and a waddling gait. Diagnosis is based on clinical presentation and genetic testing.

Sources:

Focus: Schimke immunoosseous dysplasia
Original Question: What are the symptoms of Schimke immunoosseous dysplasia ?

Focus: Schimke immunoosseous dysplasia
Original Question: What are the symptoms of Schimke immunoosseous dysplasia ?


In [None]:
%pip install ragas sentence-transformers
%pip install ragas langchain-google-genai google-generativeai

In [1]:
import google.generativeai as genai
from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_recall,
    context_precision
)
from datasets import Dataset
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.embeddings import HuggingFaceEmbeddings
import pandas as pd

In [11]:
class RAGEvaluator:
    def __init__(self, rag_system, test_data_path="/kaggle/input/medquad-processed/processed_medquad.csv"):
        self.rag_system = rag_system
        self.test_df = pd.read_csv(test_data_path)
        self.llm = ChatGoogleGenerativeAI(
            model="gemini-pro",
            temperature=0.1,
            google_api_key="YOUR_API_KEY"
        )
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2"
        )
    
    def prepare_evaluation_data(self, num_samples=10):
        """Prepare dataset for RAGAS evaluation"""
        eval_data = []
        
        # Sample questions
        sampled_data = self.test_df.sample(n=num_samples, random_state=42)
        
        for _, row in sampled_data.iterrows():
            # Get RAG response
            response = self.rag_system.get_answer(row['Questions'])
            
            # Clean up answer
            answer = response["answer"]
            if "[INST]" in answer:
                answer = answer.split("[/INST]")[-1].strip()
            
            # Format contexts properly for RAGAS
            contexts = []
            for doc in response['sources']:
                context = f"{doc['question']}: {doc['focus']}"
                if len(context) > 500:
                    context = context[:500] + "..."
                contexts.append(context)
            
            # Truncate reference if too long
            reference = row['Answers']
            if len(reference) > 1000:
                reference = reference[:1000] + "..."
                
            data_point = {
                "question": row['Questions'],
                "answer": answer,
                "contexts": contexts,
                "reference": reference,
            }
            
            eval_data.append(data_point)
        
        dataset = Dataset.from_list(eval_data)
        return dataset

    def run_evaluation(self, num_samples=10):
        """Run RAGAS evaluation"""
        print("Preparing evaluation dataset...")
        eval_dataset = self.prepare_evaluation_data(num_samples)
        
        print("\nRunning RAGAS evaluation...")
        results = evaluate(
            eval_dataset,
            metrics=[
                faithfulness,
                answer_relevancy,
                context_recall,
                context_precision
            ],
            llm=self.llm,
            embeddings=self.embeddings,
            raise_exceptions=True
        )
        
        return results

    def print_results(self, results):
        """Print evaluation results in a readable format"""
        print("\nRAGAS Evaluation Results:")
        print("-" * 50)
        print(f"Results type: {type(results)}")
        print(f"Results content: {results}")
        
        if isinstance(results, list):
            for i, result in enumerate(results):
                print(f"\nResult {i}:")
                print(f"Type: {type(result)}")
                print(f"Dir: {dir(result)}")
                try:
                    # Try to access common attributes
                    if hasattr(result, 'name'):
                        print(f"Name: {result.name}")
                    if hasattr(result, 'score'):
                        print(f"Score: {result.score:.3f}")
                    if hasattr(result, 'metadata'):
                        print(f"Metadata: {result.metadata}")
                except Exception as e:
                    print(f"Error accessing result attributes: {str(e)}")

In [12]:
evaluator = RAGEvaluator(rag_system)

In [None]:
results = evaluator.run_evaluation(num_samples=5

In [None]:
results = evaluator.run_evaluation(num_samples=3)
# Print formatted results
evaluator.print_results(results)