# RAG Pipeline for Healthcare Test Case Generation with Gemini

This notebook implements a Retrieval-Augmented Generation (RAG) pipeline that combines:
1. **FAISS** for efficient semantic search and retrieval
2. **Google Gemini** for intelligent generation based on retrieved context
3. **Healthcare-specific** test case generation aligned with NASSCOM requirements

## Key Features:
- 🔍 Semantic retrieval of relevant test cases and requirements
- 🤖 Context-aware generation using Gemini Pro
- 🏥 Healthcare compliance and standards integration
- 📊 Evaluation metrics for RAG performance
- 🚀 Production-ready pipeline architecture


In [3]:
# Import required libraries
import os
import sys
import json
import numpy as np
import pandas as pd
import faiss
from sentence_transformers import SentenceTransformer
import google.generativeai as genai
from typing import List, Dict, Any, Tuple, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import warnings
from tqdm import tqdm
from dotenv import load_dotenv

warnings.filterwarnings('ignore')

# Load environment variables
load_dotenv()

print("✅ Libraries imported successfully!")
print(f"Python version: {sys.version}")
print(f"FAISS version: {faiss.__version__ if hasattr(faiss, '__version__') else 'Available'}")

# Check for Gemini API key
if os.getenv('GEMINI_API_KEY'):
    print("✅ Gemini API key found in environment")
else:
    print("⚠️ Warning: GEMINI_API_KEY not found in .env file")
    print("Please add your Gemini API key to the .env file")


✅ Libraries imported successfully!
Python version: 3.13.7 (main, Aug 14 2025, 11:12:11) [Clang 17.0.0 (clang-1700.0.13.3)]
FAISS version: 1.12.0
✅ Gemini API key found in environment


## Step 1: Configure Gemini API

We'll use Google's Gemini Pro model for generation. Gemini Pro offers:
- Strong reasoning capabilities
- Healthcare domain understanding
- Support for structured output
- Cost-effective pricing


In [4]:
# Configure Gemini
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))

# Initialize Gemini model
model = genai.GenerativeModel('gemini-2.5-flash')

# Test Gemini connection
try:
    response = model.generate_content("Hello, Gemini! Respond with a brief greeting.")
    print("✅ Gemini API connected successfully!")
    print(f"Test response: {response.text[:100]}...")
except Exception as e:
    print(f"❌ Error connecting to Gemini: {e}")
    print("Please check your API key in the .env file")

# Model configuration for healthcare test case generation
generation_config = {
    'temperature': 0.2,  
    'top_p': 0.9,
    'top_k': 40,
    'max_output_tokens': 2048,
}

# Safety settings for healthcare content
safety_settings = [
    {
        "category": "HARM_CATEGORY_HARASSMENT",
        "threshold": "BLOCK_MEDIUM_AND_ABOVE"
    },
    {
        "category": "HARM_CATEGORY_HATE_SPEECH",
        "threshold": "BLOCK_MEDIUM_AND_ABOVE"
    },
    {
        "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
        "threshold": "BLOCK_MEDIUM_AND_ABOVE"
    },
    {
        "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
        "threshold": "BLOCK_NONE"  # Allow medical content
    }
]

print("✅ Gemini model configured for healthcare test generation")


✅ Gemini API connected successfully!
Test response: Hello!...
✅ Gemini model configured for healthcare test generation


## Step 2: Build Knowledge Base

Create a comprehensive knowledge base of healthcare test cases, requirements, and compliance standards that will be used for retrieval.


In [5]:
# Load documents from files instead of hardcoded knowledge base
import glob
import yaml
from pathlib import Path

# Path to documents folder
DOCUMENTS_PATH = "/Users/shtlpmac036/Documents/Personal/GenAI Hack /data/documents"

# Function to load all documents from the folder
def load_documents_from_folder(folder_path):
    """Load all documents from a folder and return as a list of dictionaries"""
    documents = []
    
    # Get all files in the documents folder
    document_files = glob.glob(f"{folder_path}/*")
    
    print(f"📂 Loading documents from: {folder_path}")
    print(f"Found {len(document_files)} files")
    
    for file_path in document_files:
        file_name = Path(file_path).name
        file_extension = Path(file_path).suffix
        
        try:
            if file_extension in ['.txt', '.md']:
                # Load text and markdown files
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                    documents.append({
                        'filename': file_name,
                        'type': 'text' if file_extension == '.txt' else 'markdown',
                        'content': content,
                        'metadata': {
                            'source': file_path,
                            'doc_type': 'user_story' if 'user_story' in file_name else 
                                       'prd' if 'prd' in file_name else 'general'
                        }
                    })
                    print(f"   ✅ Loaded: {file_name}")
                    
            elif file_extension in ['.yaml', '.yml']:
                # Load YAML files (API specifications)
                with open(file_path, 'r', encoding='utf-8') as f:
                    yaml_content = yaml.safe_load(f)
                    # Convert YAML to string for processing
                    content_str = yaml.dump(yaml_content, default_flow_style=False)
                    documents.append({
                        'filename': file_name,
                        'type': 'api_spec',
                        'content': content_str,
                        'metadata': {
                            'source': file_path,
                            'doc_type': 'api_specification',
                            'api_version': yaml_content.get('info', {}).get('version', 'unknown')
                        }
                    })
                    print(f"   ✅ Loaded: {file_name} (API v{yaml_content.get('info', {}).get('version', 'unknown')})")
                    
        except Exception as e:
            print(f"   ❌ Error loading {file_name}: {str(e)}")
    
    return documents

# Load all documents
documents = load_documents_from_folder(DOCUMENTS_PATH)

# Display summary of loaded documents
print(f"\n📊 Document Summary:")
print(f"Total documents loaded: {len(documents)}")
for doc in documents:
    print(f"  - {doc['filename']} ({doc['type']}) - {len(doc['content'])} characters")

print(f"\n✅ Documents loaded successfully!")
print(f"   - User Stories, PRDs, and API Specs are now available for RAG pipeline")
print(f"   - These will be processed into embeddings for semantic search")


📂 Loading documents from: /Users/shtlpmac036/Documents/Personal/GenAI Hack /data/documents
Found 5 files
   ✅ Loaded: user_story_registration.txt
   ✅ Loaded: bug_report_template.txt
   ✅ Loaded: api_spec_v1.yaml (API v1.0.0)
   ✅ Loaded: test_plan_user_management.md
   ✅ Loaded: prd_account_management.md

📊 Document Summary:
Total documents loaded: 5
  - user_story_registration.txt (text) - 2273 characters
  - bug_report_template.txt (text) - 2884 characters
  - api_spec_v1.yaml (api_spec) - 6467 characters
  - test_plan_user_management.md (markdown) - 3971 characters
  - prd_account_management.md (markdown) - 2931 characters

✅ Documents loaded successfully!
   - User Stories, PRDs, and API Specs are now available for RAG pipeline
   - These will be processed into embeddings for semantic search


## How to Add More Documents

To expand your knowledge base, simply add more documents to the `/data/documents/` folder:

1. **User Stories**: Save as `.txt` files with acceptance criteria
2. **PRDs**: Save as `.md` files with proper markdown formatting
3. **API Specs**: Save as `.yaml` or `.yml` files in OpenAPI format

The system will automatically:
- Load and parse the documents
- Split them into meaningful chunks
- Generate embeddings for semantic search
- Make them available for RAG-based test generation

This approach makes your system production-ready and demonstrates how it would work with real project documentation in the hackathon!


## Step 3: Create Vector Database for Retrieval

Build FAISS index with embeddings of all knowledge base documents for efficient semantic search.


In [6]:
# Initialize embedding model
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
print(f"✅ Loaded embedding model: all-MiniLM-L6-v2")

# Prepare documents for embedding
# Split loaded documents into chunks for better retrieval
doc_texts = []
doc_metadata = []

# Process each loaded document
for doc in documents:
    # Split document into chunks for better retrieval
    content = doc['content']
    
    if doc['type'] == 'text':  # User story
        # Split by acceptance criteria
        chunks = content.split('Acceptance Criteria:')
        
        # Add the overview as one chunk
        if len(chunks) > 0:
            doc_texts.append(chunks[0].strip())
            doc_metadata.append({
                'type': 'user_story_overview',
                'filename': doc['filename'],
                'doc_type': doc['metadata']['doc_type'],
                'source': doc['metadata']['source']
            })
        
        # Add each acceptance criterion as a separate chunk
        if len(chunks) > 1:
            criteria = chunks[1].split('\n\n')
            for i, criterion in enumerate(criteria):
                if criterion.strip():
                    doc_texts.append(f"Acceptance Criterion:\n{criterion.strip()}")
                    doc_metadata.append({
                        'type': 'acceptance_criterion',
                        'filename': doc['filename'],
                        'criterion_index': i + 1,
                        'doc_type': doc['metadata']['doc_type'],
                        'source': doc['metadata']['source']
                    })
    
    elif doc['type'] == 'markdown':  # PRD
        # Split by sections (headers)
        sections = content.split('\n## ')
        
        for section in sections:
            if section.strip():
                doc_texts.append(section.strip())
                # Extract section title
                section_lines = section.strip().split('\n')
                section_title = section_lines[0].replace('#', '').strip() if section_lines else 'Unknown Section'
                
                doc_metadata.append({
                    'type': 'prd_section',
                    'filename': doc['filename'],
                    'section': section_title,
                    'doc_type': doc['metadata']['doc_type'],
                    'source': doc['metadata']['source']
                })
    
    elif doc['type'] == 'api_spec':  # API specification
        # Split by API endpoints
        lines = content.split('\n')
        current_endpoint = []
        current_path = None
        
        for line in lines:
            if line.strip().startswith('/api/'):
                # Save previous endpoint if exists
                if current_endpoint and current_path:
                    doc_texts.append('\n'.join(current_endpoint))
                    doc_metadata.append({
                        'type': 'api_endpoint',
                        'filename': doc['filename'],
                        'endpoint': current_path,
                        'api_version': doc['metadata']['api_version'],
                        'doc_type': doc['metadata']['doc_type'],
                        'source': doc['metadata']['source']
                    })
                # Start new endpoint
                current_path = line.strip().rstrip(':')
                current_endpoint = [line]
            elif current_endpoint:
                current_endpoint.append(line)
        
        # Save last endpoint
        if current_endpoint and current_path:
            doc_texts.append('\n'.join(current_endpoint))
            doc_metadata.append({
                'type': 'api_endpoint',
                'filename': doc['filename'],
                'endpoint': current_path,
                'api_version': doc['metadata']['api_version'],
                'doc_type': doc['metadata']['doc_type'],
                'source': doc['metadata']['source']
            })

# Rename to 'documents' for compatibility with rest of notebook
documents = doc_texts

print(f"✅ Prepared {len(documents)} document chunks for embedding")
print(f"   - From {len(set(m['filename'] for m in doc_metadata))} source files")
print(f"   - Types: {set(m['type'] for m in doc_metadata)}")


✅ Loaded embedding model: all-MiniLM-L6-v2
✅ Prepared 24 document chunks for embedding
   - From 5 source files
   - Types: {'user_story_overview', 'api_endpoint', 'prd_section', 'acceptance_criterion'}


In [7]:
# Generate embeddings
print("Generating embeddings...")
embeddings = embedding_model.encode(documents, show_progress_bar=True)
embeddings = embeddings.astype('float32')

print(f"✅ Generated embeddings with shape: {embeddings.shape}")

# Create FAISS index
dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(embeddings)

print(f"✅ Created FAISS index with {index.ntotal} vectors")
print(f"   Dimension: {dimension}")

# Create a retrieval function
def retrieve_context(query: str, k: int = 5) -> List[Tuple[str, Dict, float]]:
    """
    Retrieve relevant documents for a query
    
    Args:
        query: Search query
        k: Number of documents to retrieve
    
    Returns:
        List of (document, metadata, similarity_score) tuples
    """
    # Generate query embedding
    query_embedding = embedding_model.encode([query]).astype('float32')
    
    # Search index
    distances, indices = index.search(query_embedding, k)
    
    # Prepare results
    results = []
    for dist, idx in zip(distances[0], indices[0]):
        if idx < len(documents):
            similarity = 1 / (1 + dist)  # Convert distance to similarity
            results.append((documents[idx], doc_metadata[idx], similarity))
    
    return results

# Test retrieval
test_query = "How to test patient data security and HIPAA compliance?"
print(f"\n🔍 Test Query: '{test_query}'")
results = retrieve_context(test_query, k=3)

print("\n📚 Retrieved Documents:")
for i, (doc, meta, score) in enumerate(results, 1):
    print(f"\n{i}. Type: {meta.get('type', 'unknown')}")
    print(f"   Score: {score:.3f}")
    print(f"   Title: {meta.get('title', meta.get('pattern', meta.get('standard', 'N/A')))}")
    print(f"   Preview: {doc[:150]}...")


Generating embeddings...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Generated embeddings with shape: (24, 384)
✅ Created FAISS index with 24 vectors
   Dimension: 384

🔍 Test Query: 'How to test patient data security and HIPAA compliance?'

📚 Retrieved Documents:

1. Type: prd_section
   Score: 0.585
   Title: N/A
   Preview: 2. Test Objectives

- Verify all user management features work as specified in requirements
- Ensure HIPAA compliance for all user data handling
- Val...

2. Type: prd_section
   Score: 0.509
   Title: N/A
   Preview: 3. Test Strategy

### 3.1 Functional Testing
- **Positive Testing**: Verify all features work with valid inputs
- **Negative Testing**: Validate error...

3. Type: prd_section
   Score: 0.467
   Title: N/A
   Preview: 5. Test Data Requirements

- Valid test email addresses
- Test user accounts with different roles
- Sample PHI data (anonymized)
- Performance testing...


## Step 4: Build RAG Pipeline with Gemini

Implement the complete RAG pipeline that retrieves relevant context and uses Gemini to generate test cases.


In [8]:
class RAGTestCaseGenerator:
    """
    Retrieval-Augmented Generation pipeline for test case generation
    using FAISS for retrieval and Gemini for generation
    """
    
    def __init__(self, index, documents, metadata, embedding_model, gemini_model):
        self.index = index
        self.documents = documents
        self.metadata = metadata
        self.embedding_model = embedding_model
        self.gemini_model = gemini_model
        
    def retrieve(self, query: str, k: int = 5) -> List[Tuple[str, Dict, float]]:
        """Retrieve relevant documents"""
        query_embedding = self.embedding_model.encode([query]).astype('float32')
        distances, indices = self.index.search(query_embedding, k)
        
        results = []
        for dist, idx in zip(distances[0], indices[0]):
            if idx < len(self.documents):
                similarity = 1 / (1 + dist)
                results.append((self.documents[idx], self.metadata[idx], similarity))
        
        return results
    
    def generate_test_case(self, requirement: str, num_context: int = 5) -> Dict:
        """
        Generate a test case using RAG
        
        Args:
            requirement: The requirement to generate a test case for
            num_context: Number of context documents to retrieve
        
        Returns:
            Generated test case as a dictionary
        """
        # Step 1: Retrieve relevant context
        context_docs = self.retrieve(requirement, k=num_context)
        
        # Step 2: Build context string
        context_str = self._build_context_string(context_docs)
        
        # Step 3: Create prompt with retrieved context
        prompt = self._create_generation_prompt(requirement, context_str)
        
        # Step 4: Generate with Gemini
        try:
            # Create a simpler prompt without forcing JSON mime type
            structured_prompt = f"""You are a software testing expert. Generate a detailed test case for the following requirement:

{requirement}

CONTEXT FROM DOCUMENTS:
{context_str[:800]}

Create a test case with ALL of these fields (be specific and detailed):
- id: Unique ID like TC_XXX
- title: Clear descriptive title
- description: What exactly is being tested
- category: (Functional/Security/Integration/Performance)
- priority: (High/Medium/Low)
- compliance: List relevant standards like HIPAA, GDPR
- preconditions: What must be true before testing
- test_steps: Detailed numbered steps (at least 3-5 steps)
- expected_results: Specific expected outcomes
- test_data: Sample data needed for testing
- edge_cases: Special scenarios to consider

Format your response as a valid JSON object starting with {{ and ending with }}.
Example format:
{{
  "id": "TC_001",
  "title": "Verify user login with valid credentials",
  "description": "Test that users can successfully log in",
  "category": "Functional",
  "priority": "High",
  "compliance": ["HIPAA"],
  "preconditions": "User account exists",
  "test_steps": ["Navigate to login", "Enter credentials", "Click submit"],
  "expected_results": "User is logged in successfully",
  "test_data": "username: test@example.com, password: Test123!",
  "edge_cases": ["Session timeout", "Multiple login attempts"]
}}"""

            # Generate without forcing JSON mime type
            response = self.gemini_model.generate_content(
                structured_prompt,
                generation_config={
                    'temperature': 0.3,  # Slightly higher for creativity
                    'top_p': 0.9,
                    'top_k': 30,
                    'max_output_tokens': 1500,
                },
                safety_settings=safety_settings
            )
            
            # Extract text safely
            raw_text = None
            if hasattr(response, 'text'):
                try:
                    raw_text = response.text
                except Exception:
                    pass
            
            if not raw_text and hasattr(response, 'candidates') and response.candidates:
                try:
                    if response.candidates[0].content.parts:
                        raw_text = response.candidates[0].content.parts[0].text
                except Exception:
                    pass
            
            if not raw_text:
                raise ValueError("No response text available")
            
            # Clean and parse JSON
            test_case = None
            
            # Remove common problematic patterns
            cleaned = raw_text.strip()
            cleaned = cleaned.replace('```json', '').replace('```', '')
            cleaned = cleaned.replace('\n\n', '\n')
            
            # Try direct JSON parse
            try:
                test_case = json.loads(cleaned)
            except json.JSONDecodeError:
                # Try to extract JSON object
                import re
                # More robust JSON extraction
                patterns = [
                    r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}',  # Nested objects
                    r'\{[^\}]+\}',  # Simple object
                    r'\{.*\}',  # Greedy match
                ]
                
                for pattern in patterns:
                    matches = re.findall(pattern, cleaned, re.DOTALL)
                    for match in matches:
                        try:
                            # Try to fix common issues
                            fixed = match
                            # Fix trailing commas
                            fixed = re.sub(r',\s*}', '}', fixed)
                            fixed = re.sub(r',\s*]', ']', fixed)
                            # Try parse
                            test_case = json.loads(fixed)
                            if isinstance(test_case, dict) and 'id' in test_case:
                                break
                        except:
                            continue
                    if test_case:
                        break
            
            # Validate and enhance test case
            if test_case and isinstance(test_case, dict):
                # Ensure required fields
                if 'id' not in test_case:
                    test_case['id'] = f'TC_{datetime.now().strftime("%H%M%S")}'
                if 'title' not in test_case:
                    test_case['title'] = requirement[:50]
                if 'test_steps' not in test_case or not test_case['test_steps']:
                    test_case['test_steps'] = ["Setup test environment", "Execute test", "Verify results"]
                if 'compliance' not in test_case:
                    test_case['compliance'] = ['HIPAA']
                    
                # Add metadata
                test_case['generated_from'] = requirement
                test_case['context_used'] = [meta.get('filename', 'Unknown') 
                                            for _, meta, _ in context_docs[:3]]
                return test_case
            else:
                raise ValueError("Could not generate valid test case")
                
        except Exception as e:
            # Don't print every error in batch mode
            if len(requirement) < 100:  # Likely not batch mode
                print(f"⚠️ Using fallback: {str(e)[:50]}...")
            return self._create_fallback_test_case(requirement)
    
    def _build_context_string(self, context_docs: List) -> str:
        """Build formatted context string from retrieved documents"""
        context_parts = []
        
        for doc, meta, score in context_docs[:3]:  # Use top 3 most relevant
            doc_type = meta.get('type', 'unknown')
            if doc_type == 'test_case':
                context_parts.append(f"Similar Test Case:\n{doc}\n")
            elif doc_type == 'requirement':
                context_parts.append(f"Related Requirement:\n{doc}\n")
            elif doc_type == 'compliance':
                context_parts.append(f"Compliance Standard:\n{doc}\n")
            elif doc_type == 'test_pattern':
                context_parts.append(f"Test Pattern:\n{doc}\n")
        
        return "\n---\n".join(context_parts)
    
    def _create_generation_prompt(self, requirement: str, context: str) -> str:
        """Create the prompt for Gemini"""
        prompt = f"""You are a healthcare software testing expert. Generate a comprehensive test case based on the following requirement and context.

REQUIREMENT:
{requirement}

RELEVANT CONTEXT:
{context}

Generate a detailed test case that includes:
1. Test Case ID (format: TC_XXXX)
2. Title (clear and descriptive)
3. Description (what is being tested)
4. Category (e.g., Security, Clinical, Integration, etc.)
5. Priority (Critical/High/Medium/Low)
6. Compliance Standards (e.g., HIPAA, GDPR, FDA)
7. Preconditions (what must be true before testing)
8. Test Steps (numbered, detailed steps)
9. Expected Results (specific outcomes)
10. Test Data Requirements
11. Edge Cases to Consider

Format the response as a structured JSON object.

Focus on:
- Healthcare-specific requirements
- Patient safety considerations
- Data privacy and security
- Regulatory compliance
- Clinical accuracy
"""
        return prompt
    
    def _parse_response(self, response_text: str) -> Dict:
        """Parse Gemini response into structured test case"""
        import re
        
        # Try to extract JSON from response
        try:
            # Look for JSON in the response
            json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
            if json_match:
                return json.loads(json_match.group())
        except:
            pass
        
        # Fallback: Parse text response
        test_case = {
            'id': self._extract_field(response_text, 'ID', 'TC_AUTO'),
            'title': self._extract_field(response_text, 'Title', 'Generated Test Case'),
            'description': self._extract_field(response_text, 'Description', response_text[:200]),
            'category': self._extract_field(response_text, 'Category', 'General'),
            'priority': self._extract_field(response_text, 'Priority', 'Medium'),
            'compliance': self._extract_list(response_text, 'Compliance'),
            'preconditions': self._extract_field(response_text, 'Preconditions', 'System is operational'),
            'test_steps': self._extract_list(response_text, 'Steps'),
            'expected_results': self._extract_field(response_text, 'Expected', 'System functions as specified'),
            'test_data': self._extract_field(response_text, 'Test Data', 'Standard test data'),
            'edge_cases': self._extract_list(response_text, 'Edge Cases')
        }
        
        return test_case
    
    def _extract_field(self, text: str, field: str, default: str) -> str:
        """Extract a field value from text"""
        import re
        pattern = rf'{field}[:\s]+([^\n]+)'
        match = re.search(pattern, text, re.IGNORECASE)
        return match.group(1).strip() if match else default
    
    def _extract_list(self, text: str, field: str) -> List[str]:
        """Extract a list from text"""
        import re
        # Find the section
        pattern = rf'{field}[:\s]*\n((?:[-•\d]+[.\s]+[^\n]+\n?)+)'
        match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE)
        
        if match:
            items_text = match.group(1)
            # Extract individual items
            items = re.findall(r'[-•\d]+[.\s]+([^\n]+)', items_text)
            return [item.strip() for item in items]
        
        return []
    
    def _create_fallback_test_case(self, requirement: str) -> Dict:
        """Create a basic test case when generation fails"""
        return {
            'id': f'TC_{datetime.now().strftime("%H%M%S")}',
            'title': f'Test: {requirement[:50]}',
            'description': f'Verify that {requirement}',
            'category': 'General',
            'priority': 'Medium',
            'compliance': ['HIPAA'],
            'preconditions': 'System is in stable state',
            'test_steps': [
                'Navigate to relevant module',
                'Perform required action',
                'Verify outcome'
            ],
            'expected_results': 'System behaves as specified',
            'test_data': 'Standard test data',
            'edge_cases': [],
            'generated_from': requirement,
            'context_used': [],
            'fallback': True
        }
    
    def batch_generate(self, requirements: List[str], progress: bool = True) -> List[Dict]:
        """Generate test cases for multiple requirements"""
        test_cases = []
        
        iterator = tqdm(requirements) if progress else requirements
        
        for req in iterator:
            if progress:
                iterator.set_description(f"Generating: {req[:30]}...")
            
            test_case = self.generate_test_case(req)
            test_cases.append(test_case)
        
        return test_cases
    
    def evaluate_generation(self, test_case: Dict) -> Dict:
        """Evaluate the quality of a generated test case"""
        evaluation = {
            'completeness': 0,
            'compliance_coverage': 0,
            'detail_level': 0,
            'healthcare_specificity': 0,
            'overall_score': 0
        }
        
        # Check completeness
        required_fields = ['id', 'title', 'description', 'test_steps', 'expected_results']
        completeness = sum(1 for field in required_fields if test_case.get(field)) / len(required_fields)
        evaluation['completeness'] = completeness
        
        # Check compliance coverage
        if test_case.get('compliance'):
            evaluation['compliance_coverage'] = min(len(test_case['compliance']) / 3, 1.0)
        
        # Check detail level
        if test_case.get('test_steps'):
            evaluation['detail_level'] = min(len(test_case['test_steps']) / 5, 1.0)
        
        # Check healthcare specificity
        healthcare_keywords = ['patient', 'clinical', 'HIPAA', 'medical', 'healthcare', 
                              'diagnosis', 'treatment', 'PHI', 'provider']
        text = str(test_case).lower()
        keyword_count = sum(1 for keyword in healthcare_keywords if keyword in text)
        evaluation['healthcare_specificity'] = min(keyword_count / 3, 1.0)
        
        # Calculate overall score
        evaluation['overall_score'] = sum(evaluation.values()) / 4
        
        return evaluation

# Initialize RAG pipeline
rag_generator = RAGTestCaseGenerator(
    index=index,
    documents=documents,
    metadata=doc_metadata,
    embedding_model=embedding_model,
    gemini_model=model
)

print("✅ RAG Test Case Generator initialized")


✅ RAG Test Case Generator initialized


## Step 5: Generate Test Cases Using RAG

Now let's demonstrate the RAG pipeline by generating test cases for various requirements from our loaded documents.


In [9]:
# Example 1: Generate test case for user registration
requirement_1 = "Test user registration with valid email and password according to acceptance criteria"

print("🎯 Requirement:", requirement_1)
print("\n🔍 Retrieving relevant context...")

# Retrieve context
context_docs = rag_generator.retrieve(requirement_1, k=3)
print(f"Found {len(context_docs)} relevant documents")

for i, (doc, meta, score) in enumerate(context_docs, 1):
    print(f"\n📄 Document {i}:")
    print(f"   Type: {meta.get('type')}")
    print(f"   Source: {meta.get('filename', 'Unknown')}")
    print(f"   Relevance: {score:.2%}")
    print(f"   Preview: {doc[:100]}...")

print("\n⚡ Generating test case with Gemini...")
test_case_1 = rag_generator.generate_test_case(requirement_1)

# Display generated test case
print("\n✅ Generated Test Case:")
print("=" * 50)
for key, value in test_case_1.items():
    if isinstance(value, list):
        print(f"{key.upper()}:")
        for item in value:
            print(f"  - {item}")
    else:
        print(f"{key.upper()}: {value}")

# Evaluate the test case
evaluation = rag_generator.evaluate_generation(test_case_1)
print("\n📊 Quality Evaluation:")
for metric, score in evaluation.items():
    print(f"   {metric}: {score:.2%}")


🎯 Requirement: Test user registration with valid email and password according to acceptance criteria

🔍 Retrieving relevant context...
Found 3 relevant documents

📄 Document 1:
   Type: acceptance_criterion
   Source: user_story_registration.txt
   Relevance: 66.56%
   Preview: Acceptance Criterion:
1. **Successful Registration:** Given a user is on the registration page, when...

📄 Document 2:
   Type: prd_section
   Source: test_plan_user_management.md
   Relevance: 58.19%
   Preview: 4. Test Scenarios

### Registration Testing
1. **Valid Registration Flow**
   - Test with valid emai...

📄 Document 3:
   Type: acceptance_criterion
   Source: user_story_registration.txt
   Relevance: 57.10%
   Preview: Acceptance Criterion:
4. **Password Mismatch Error:** Given a user is on the registration page, when...

⚡ Generating test case with Gemini...

✅ Generated Test Case:
EMAIL: valid.newuser@example.com
PASSWORD: StrongP@ssw0rd123!
CONFIRM_PASSWORD: StrongP@ssw0rd123!
ID: TC_185318
TITLE: 

In [10]:
# Example 2: Generate API test case
requirement_2 = "Test the /api/v1/users/register endpoint with invalid email format"

print("🎯 Requirement:", requirement_2)
test_case_2 = rag_generator.generate_test_case(requirement_2)

print("\n✅ Generated API Test Case:")
print(f"ID: {test_case_2.get('id')}")
print(f"Title: {test_case_2.get('title')}")
print(f"Category: {test_case_2.get('category')}")
print(f"Priority: {test_case_2.get('priority')}")

if test_case_2.get('test_steps'):
    print("\nTest Steps:")
    for i, step in enumerate(test_case_2.get('test_steps', []), 1):
        print(f"  {i}. {step}")

print(f"\nExpected Result: {test_case_2.get('expected_results')}")


🎯 Requirement: Test the /api/v1/users/register endpoint with invalid email format
⚠️ Using fallback: Could not generate valid test case...

✅ Generated API Test Case:
ID: TC_185326
Title: Test: Test the /api/v1/users/register endpoint with inva
Category: General
Priority: Medium

Test Steps:
  1. Navigate to relevant module
  2. Perform required action
  3. Verify outcome

Expected Result: System behaves as specified


## Step 6: Batch Test Case Generation

Generate multiple test cases based on different requirements extracted from our documents.


In [11]:
# Define test requirements based on our loaded documents
test_requirements = [
    "Test successful user registration with all required fields",
    "Test password reset token expiration after 24 hours",
    "Verify HIPAA compliance for patient data handling",
    "Test API authentication with invalid credentials",
    "Validate password strength requirements",
    "Test user profile data encryption at rest",
    "Verify audit trail generation for PHI access",
    "Test concurrent user registration handling"
]

print(f"🚀 Generating {len(test_requirements)} test cases...\n")

# Generate test cases in batch
generated_test_cases = rag_generator.batch_generate(test_requirements, progress=True)

print(f"\n✅ Successfully generated {len(generated_test_cases)} test cases!")

# Analyze the generated test cases
categories = {}
priorities = {}
compliance_standards = set()

for tc in generated_test_cases:
    # Count categories
    category = tc.get('category', 'Unknown')
    categories[category] = categories.get(category, 0) + 1
    
    # Count priorities
    priority = tc.get('priority', 'Unknown')
    priorities[priority] = priorities.get(priority, 0) + 1
    
    # Collect compliance standards
    if tc.get('compliance'):
        compliance_standards.update(tc['compliance'])

print("\n📊 Test Case Analysis:")
print(f"Categories: {categories}")
print(f"Priorities: {priorities}")
print(f"Compliance Standards Covered: {list(compliance_standards)}")

# Calculate average quality score
avg_scores = {'completeness': 0, 'compliance_coverage': 0, 'detail_level': 0, 'healthcare_specificity': 0}
for tc in generated_test_cases:
    eval_scores = rag_generator.evaluate_generation(tc)
    for key in avg_scores:
        avg_scores[key] += eval_scores[key]

for key in avg_scores:
    avg_scores[key] /= len(generated_test_cases)

print("\n📈 Average Quality Metrics:")
for metric, score in avg_scores.items():
    print(f"   {metric}: {score:.2%}")


🚀 Generating 8 test cases...



Generating: Verify HIPAA compliance for pa...:  25%|██▌       | 2/8 [00:17<00:53,  8.94s/it]

⚠️ Using fallback: Could not generate valid test case...


Generating: Test API authentication with i...:  38%|███▊      | 3/8 [00:27<00:47,  9.46s/it]

⚠️ Using fallback: Could not generate valid test case...


Generating: Validate password strength req...:  50%|█████     | 4/8 [00:38<00:39,  9.96s/it]

⚠️ Using fallback: Could not generate valid test case...


Generating: Test user profile data encrypt...:  62%|██████▎   | 5/8 [00:46<00:27,  9.25s/it]

⚠️ Using fallback: Could not generate valid test case...


Generating: Test user profile data encrypt...:  62%|██████▎   | 5/8 [00:48<00:29,  9.69s/it]


KeyboardInterrupt: 

## Step 7: Export Generated Test Cases

Save the generated test cases for use in test management tools.


In [None]:
# Export test cases to different formats

# 1. Export as JSON
output_dir = "/Users/shtlpmac036/Documents/Personal/GenAI Hack /data/generated_test_cases"
os.makedirs(output_dir, exist_ok=True)

# Save as JSON
json_file = os.path.join(output_dir, f"test_cases_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(json_file, 'w') as f:
    json.dump(generated_test_cases, f, indent=2)
print(f"✅ Saved {len(generated_test_cases)} test cases to: {json_file}")

# 2. Export as CSV for test management tools
df = pd.DataFrame(generated_test_cases)

# Flatten lists for CSV export
for col in df.columns:
    if df[col].dtype == 'object':
        df[col] = df[col].apply(lambda x: '; '.join(x) if isinstance(x, list) else x)

csv_file = os.path.join(output_dir, f"test_cases_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
df.to_csv(csv_file, index=False)
print(f"✅ Exported test cases to CSV: {csv_file}")

# 3. Create a formatted test document
doc_file = os.path.join(output_dir, f"test_document_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md")
with open(doc_file, 'w') as f:
    f.write("# Generated Test Cases\n\n")
    f.write(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
    f.write(f"Total Test Cases: {len(generated_test_cases)}\n\n")
    
    for tc in generated_test_cases:
        f.write(f"## {tc.get('id', 'TC_XXX')}: {tc.get('title', 'Untitled')}\n\n")
        f.write(f"**Category:** {tc.get('category', 'N/A')}\\n")
        f.write(f"**Priority:** {tc.get('priority', 'N/A')}\\n")
        f.write(f"**Compliance:** {', '.join(tc.get('compliance', [])) or 'N/A'}\\n\\n")
        
        f.write(f"### Description\\n{tc.get('description', 'N/A')}\\n\\n")
        
        f.write("### Test Steps\\n")
        for i, step in enumerate(tc.get('test_steps', []), 1):
            f.write(f"{i}. {step}\\n")
        
        f.write(f"\\n### Expected Results\\n{tc.get('expected_results', 'N/A')}\\n\\n")
        f.write("---\\n\\n")

print(f"✅ Created formatted test document: {doc_file}")

# Display sample of exported data
print("\n📋 Sample Test Case (First Entry):")
print(json.dumps(generated_test_cases[0], indent=2)[:500] + "...")


## 🎮 Interactive Test Case Generation

Try generating your own test cases by modifying the requirement below!


In [None]:
# Interactive test case generation
# Modify the requirement below and run the cell to generate a custom test case

# 🎯 MODIFY THIS REQUIREMENT
custom_requirement = """
Test the password complexity validation to ensure it meets the following criteria:
- Minimum 8 characters
- At least one uppercase letter
- At least one number
- At least one special character
"""

print("🎯 Custom Requirement:")
print(custom_requirement)
print("\n" + "="*50)

# Generate test case
print("\n⚡ Generating custom test case...")
custom_test_case = rag_generator.generate_test_case(custom_requirement)

# Display the generated test case in a formatted way
print("\n✅ Generated Test Case:\n")
print(f"📌 ID: {custom_test_case.get('id')}")
print(f"📝 Title: {custom_test_case.get('title')}")
print(f"📁 Category: {custom_test_case.get('category')}")
print(f"⚠️  Priority: {custom_test_case.get('priority')}")
print(f"🏥 Compliance: {', '.join(custom_test_case.get('compliance', []))}")

print("\n📋 Test Steps:")
for i, step in enumerate(custom_test_case.get('test_steps', []), 1):
    print(f"   {i}. {step}")

print(f"\n✓ Expected Result: {custom_test_case.get('expected_results')}")

# Quality check
quality = rag_generator.evaluate_generation(custom_test_case)
print(f"\n📊 Quality Score: {quality['overall_score']:.2%}")

# Tip for users
print("\n💡 TIP: Modify the 'custom_requirement' variable above with your own test requirement and re-run this cell!")
