# FHIR Agent Workflow Notebook

This notebook implements an AI agent workflow that auto-fills FHIR forms (Questionnaires) from audio recordings. The solution uses a Retrieval-Augmented Generation (RAG) approach to discover the best matching Questionnaire and then generates a FHIR `QuestionnaireResponse`. 

We provide two solutions for orchestrating the workflow as an agent:

- **Solution 1:** Uses LangChain and LangGraph to manage the task flow.
- **Solution 2:** Uses direct integrations with Gemini (`gemini-2.0-flash`) and Vertex AI APIs for full orchestration.

The HAPI FHIR server is used as our FHIR backend: `https://hapi.fhir.org/baseR4/`.

## Setup and Installation

Install the required libraries before running the notebook. (Uncomment the pip install commands if needed.)

In [None]:
# Uncomment and run these if needed
# !pip install requests nbformat chromadb langchain langgraph repair-json
# Additional libraries might be needed for integration with Gemini/Vertex

import requests
import json
import time

# For our vector DB, we'll use a simple in-memory structure to simulate the embeddings/indexing using Chroma's API
from typing import List, Dict

## Dummy embedding function (replace with real API call, e.g., OpenAI embeddings)
def compute_embedding(text: str) -> List[float]:
    """
    Computes a vector embedding for a given text.
    Replace this with a call to an embedding API like OpenAI or HuggingFace.
    """
    # For simplicity, use character ordinal values normalized (this is a dummy placeholder!)
    return [float(ord(c)) for c in text[:50]]  # limiting length for demo

## Simple in-memory vector store for demonstration purposes
class VectorStore:
    def __init__(self):
        self.index = []  # list of tuples: (questionnaire_id, embedding, metadata)

    def add(self, questionnaire_id: str, embedding: List[float], metadata: Dict):
        self.index.append((questionnaire_id, embedding, metadata))

    def search(self, query_embedding: List[float], top_k: int = 1) -> List[Dict]:
        """
        Perform a dummy similarity search by computing Euclidean distance.
        """
        def distance(vec1, vec2):
            return sum((a - b) ** 2 for a, b in zip(vec1, vec2)) ** 0.5

        scored = []
        for q_id, emb, metadata in self.index:
            d = distance(query_embedding, emb)
            scored.append((d, q_id, metadata))
        scored.sort(key=lambda x: x[0])
        results = []
        for score, q_id, metadata in scored[:top_k]:
            results.append({"questionnaire_id": q_id, "score": score, "metadata": metadata})
        return results

# Initialize our in-memory vector store
vector_store = VectorStore()

print("Setup complete.")

## Indexing Setup: Pull and Index FHIR Questionnaires

This cell fetches all Questionnaire resources from the HAPI FHIR server, extracts key metadata, computes embeddings, and indexes them into our vector store.

In [None]:
FHIR_BASE = "https://hapi.fhir.org/baseR4"

def fetch_questionnaires() -> List[Dict]:
    """
    Fetches all Questionnaire resources from the HAPI FHIR server.
    """
    url = f"{FHIR_BASE}/Questionnaire?_count=100"
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(f"Failed to fetch questionnaires: {response.text}")
    data = response.json()
    return data.get('entry', [])

def extract_metadata(questionnaire: Dict) -> str:
    """
    Extracts identifying information from a Questionnaire resource.
    """
    resource = questionnaire.get('resource', {})
    title = resource.get('title', "")
    description = resource.get('description', "")
    # Combine title and description for embedding
    return f"Title: {title}. Description: {description}".strip()

def index_questionnaires():
    questionnaires = fetch_questionnaires()
    print(f"Fetched {len(questionnaires)} questionnaires")
    for entry in questionnaires:
        resource = entry.get('resource', {})
        q_id = resource.get('id')
        metadata_str = extract_metadata(entry)
        emb = compute_embedding(metadata_str)
        vector_store.add(q_id, emb, {"metadata": metadata_str})
    print("Indexing complete.")

# Run indexing
index_questionnaires()

## 1. Transcribe and Diarize Audio

This function takes an audio file (path) and returns a transcription and speaker segments. (Replace the placeholder logic with actual model calls, e.g., Whisper.)

In [None]:
def transcribe_and_diarize(audio_path: str) -> dict:
    """
    Transcribes and diarizes the input audio file.
    
    Args:
        audio_path (str): Path to the audio file.
    
    Returns:
        dict: Contains 'transcription' and 'speaker_segments'.
    """
    # TODO: Replace with actual audio processing using, e.g., Whisper
    transcription = "This is a sample transcription from the audio file."
    speaker_segments = ["Speaker 1", "Speaker 2"]
    return {"transcription": transcription, "speaker_segments": speaker_segments}

# Example usage:
audio_data = transcribe_and_diarize("path/to/audio.wav")
print(audio_data)

## 2. Discover Relevant Questionnaire via RAG Retrieval

This function uses a user prompt to compute its embedding and performs a vector search against our indexed Questionnaires. It then retrieves the best matching Questionnaire from the HAPI FHIR server. If no relevant Questionnaire is found, the function raises an exception.

In [None]:
def discover_questionnaire(prompt: str) -> dict:
    """
    Discovers the most relevant Questionnaire from the pre-indexed embeddings using a RAG approach.
    
    Args:
        prompt (str): The user text input describing the desired form.
    
    Returns:
        dict: The Questionnaire resource in JSON format.
              If no relevant questionnaire is found, raises an Exception.
    """
    # Compute embedding for the prompt
    prompt_embedding = compute_embedding(prompt)
    # Search our in-memory vector store
    results = vector_store.search(prompt_embedding, top_k=1)
    if not results:
        raise Exception("No matching questionnaire found for the provided prompt.")
    best_match = results[0]
    questionnaire_id = best_match["questionnaire_id"]

    # Retrieve the full Questionnaire from the HAPI FHIR server
    url = f"{FHIR_BASE}/Questionnaire/{questionnaire_id}"
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(f"Failed to retrieve Questionnaire {questionnaire_id}: {response.text}")
    questionnaire = response.json()
    return questionnaire

# Example usage:
try:
    questionnaire = discover_questionnaire("Record patient vitals")
    print("Questionnaire found:", questionnaire.get('id'))
except Exception as e:
    print(e)

## 3. Generate QuestionnaireResponse

This function generates a FHIR `QuestionnaireResponse` from the transcription and the retrieved Questionnaire. In a production system, this would likely use a generative model such as the `gemini-2.0-flash` model.

In [None]:
def generate_questionnaire_response(transcription: str, questionnaire: dict) -> dict:
    """
    Generates a FHIR QuestionnaireResponse based on the transcription and the retrieved Questionnaire.
    
    Args:
        transcription (str): The transcribed text from the audio.
        questionnaire (dict): The Questionnaire resource JSON.
        
    Returns:
        dict: A generated QuestionnaireResponse resource (dummy implementation here).
    """
    # TODO: Replace with a call to a generative model (e.g., gemini-2.0-flash) if needed
    response = {
        "resourceType": "QuestionnaireResponse",
        "questionnaire": questionnaire.get('id', 'unknown'),
        "status": "completed",
        "text": transcription,
        "item": []  # Populate with actual answers based on questionnaire items
    }
    return response

# Example usage:
dummy_q = {"id": "example-id"}
q_response = generate_questionnaire_response(audio_data["transcription"], dummy_q)
print(q_response)

## 4. Repair and Validate QuestionnaireResponse

This function repairs any malformed JSON (using a placeholder for `repair_json`) and validates the QuestionnaireResponse against the HAPI FHIR server using the `$validate` operation.

In [None]:
def repair_and_validate_response(questionnaire_response: dict) -> dict:
    """
    Repairs and validates the QuestionnaireResponse against the HAPI FHIR server.
    
    Args:
        questionnaire_response (dict): The QuestionnaireResponse resource.
    
    Returns:
        dict: The result of the validation.
        If validation fails, raises an Exception.
    """
    # Dummy repair step: In practice, you might call repair_json here
    repaired_response = questionnaire_response  # Assuming it's correct for this demo

    # Validate against HAPI FHIR using the $validate operation
    validate_url = f"{FHIR_BASE}/QuestionnaireResponse/$validate"
    headers = {"Content-Type": "application/fhir+json"}
    response = requests.post(validate_url, headers=headers, data=json.dumps(repaired_response))
    if response.status_code != 200:
        raise Exception(f"Validation failed: {response.text}")
    validation_result = response.json()
    return validation_result

# Example usage:
try:
    validation = repair_and_validate_response(q_response)
    print("Validation successful:", validation)
except Exception as e:
    print(e)

## 5. Save QuestionnaireResponse

This function saves the validated QuestionnaireResponse to the HAPI FHIR server using a POST request.

In [None]:
def save_questionnaire_response(questionnaire_response: dict) -> str:
    """
    Saves the QuestionnaireResponse resource to the HAPI FHIR server.
    
    Args:
        questionnaire_response (dict): The validated QuestionnaireResponse resource.
    
    Returns:
        str: A confirmation message if saving is successful.
    """
    url = f"{FHIR_BASE}/QuestionnaireResponse"
    headers = {"Content-Type": "application/fhir+json"}
    response = requests.post(url, headers=headers, data=json.dumps(questionnaire_response))
    if response.status_code not in [200, 201]:
        raise Exception(f"Failed to save QuestionnaireResponse: {response.text}")
    return "QuestionnaireResponse saved successfully."

# Example usage:
try:
    save_message = save_questionnaire_response(q_response)
    print(save_message)
except Exception as e:
    print(e)

## 6. Agent Orchestration

We implement two approaches for orchestrating the end-to-end workflow as an agent:

### 6.1 Agent Orchestration using LangChain & LangGraph

In this approach, we encapsulate each workflow step as an individual chain/node, and a LangChain agent coordinates the flow. (This is a placeholder implementation to demonstrate the design.)

In [None]:
from langchain.chains import SimpleSequentialChain

def langchain_agent_workflow(audio_path: str, prompt: str):
    """
    Orchestrates the workflow using a LangChain agent.
    
    Args:
        audio_path (str): The path to the audio file.
        prompt (str): The user prompt for questionnaire discovery.
    
    Returns:
        str: Final confirmation message after the workflow completes.
    """
    # Step 1: Transcribe & Diarize
    audio_data = transcribe_and_diarize(audio_path)

    # Step 2: Discover Questionnaire via RAG Retrieval
    questionnaire = discover_questionnaire(prompt)

    # Step 3: Generate QuestionnaireResponse
    q_response = generate_questionnaire_response(audio_data["transcription"], questionnaire)

    # Step 4: Repair and Validate
    repair_and_validate_response(q_response)  # will raise exception if invalid

    # Step 5: Save the QuestionnaireResponse
    result = save_questionnaire_response(q_response)

    return result

# Example usage of LangChain Agent
try:
    result_msg = langchain_agent_workflow("path/to/audio.wav", "Record patient vitals")
    print("LangChain Agent Workflow Result:", result_msg)
except Exception as e:
    print(e)

### 6.2 Agent Orchestration using Gemini + Vertex APIs

In this approach, we directly call the Gemini (`gemini-2.0-flash`) model and Vertex AI APIs to orchestrate the workflow. 

Note: In this placeholder implementation, we simulate API calls with dummy functions. Replace these with actual API integration as needed.

In [None]:
# Dummy functions to simulate Gemini and Vertex API calls
def call_gemini_model(task: str, payload: dict) -> dict:
    """
    Simulates a call to the gemini-2.0-flash model via an API.
    """
    # Placeholder response: in practice, send an API request
    print(f"Calling Gemini for task '{task}' with payload: {payload}")
    return {"result": f"Gemini result for {task}"}

def call_vertex_api(task: str, payload: dict) -> dict:
    """
    Simulates a call to a Vertex AI endpoint.
    """
    print(f"Calling Vertex API for task '{task}' with payload: {payload}")
    return {"result": f"Vertex result for {task}"}

def gemini_vertex_agent_workflow(audio_path: str, prompt: str) -> str:
    """
    Orchestrates the workflow by directly calling Gemini and Vertex AI APIs.
    
    Args:
        audio_path (str): The path to the audio file.
        prompt (str): The user prompt for questionnaire discovery.
    
    Returns:
        str: Final confirmation message after the workflow completes.
    """
    # Step 1: Transcribe & Diarize via Gemini
    gemini_transcription = call_gemini_model("transcribe_and_diarize", {"audio_path": audio_path})
    # Simulate extraction of transcription
    transcription = "This is the transcription from Gemini."

    # Step 2: Discover Questionnaire via RAG (simulate using Vertex API for retrieval)
    gemini_prompt = {"prompt": prompt}
    vertex_result = call_vertex_api("discover_questionnaire", gemini_prompt)
    # For demo, retrieve a dummy Questionnaire
    questionnaire = {"id": "example-id", "resourceType": "Questionnaire"}

    # Step 3: Generate QuestionnaireResponse via Gemini
    gen_payload = {"transcription": transcription, "questionnaire": questionnaire}
    gemini_gen = call_gemini_model("generate_questionnaire_response", gen_payload)
    q_response = {
        "resourceType": "QuestionnaireResponse",
        "questionnaire": questionnaire.get("id"),
        "status": "completed",
        "text": transcription,
        "item": []
    }

    # Step 4: Repair and Validate via Vertex API
    repair_payload = {"questionnaire_response": q_response}
    vertex_validation = call_vertex_api("repair_and_validate_response", repair_payload)
    # Assuming validation passed

    # Step 5: Save QuestionnaireResponse via Gemini
    save_payload = {"questionnaire_response": q_response}
    gemini_save = call_gemini_model("save_questionnaire_response", save_payload)

    return "QuestionnaireResponse saved via Gemini+Vertex Agent Workflow."

# Example usage of Gemini + Vertex API Agent
try:
    result_msg2 = gemini_vertex_agent_workflow("path/to/audio.wav", "Record patient vitals")
    print("Gemini+Vertex Agent Workflow Result:", result_msg2)
except Exception as e:
    print(e)

## 7. Main Orchestration Cell

This cell demonstrates the full end-to-end orchestration of the workflow using both agent solutions.

In [None]:
def main_workflow(audio_path: str, prompt: str):
    """
    Main orchestration function that runs both solutions and compares results.
    """
    print("Running LangChain & LangGraph Agent Workflow...")
    try:
        result1 = langchain_agent_workflow(audio_path, prompt)
        print("LangChain Agent Result:", result1)
    except Exception as e:
        print("LangChain Agent failed:", e)

    print("\nRunning Gemini + Vertex Agent Workflow...")
    try:
        result2 = gemini_vertex_agent_workflow(audio_path, prompt)
        print("Gemini+Vertex Agent Result:", result2)
    except Exception as e:
        print("Gemini+Vertex Agent failed:", e)

if __name__ == "__main__":
    # Replace these with actual paths/prompts
    test_audio_path = "path/to/audio.wav"
    test_prompt = "Record patient vitals"
    main_workflow(test_audio_path, test_prompt)

## Conclusion

This notebook provides a comprehensive blueprint for a FHIR Agent workflow that processes audio recordings, discovers relevant FHIR Questionnaires using a RAG-based approach, generates a QuestionnaireResponse, repairs/validates it, and then saves it to the HAPI FHIR server.

Two agent orchestration solutions are demonstrated:

- **LangChain & LangGraph Based Agent**: Utilizes modular chains to orchestrate the workflow.
- **Gemini + Vertex APIs Based Agent**: Directly calls the Gemini (`gemini-2.0-flash`) and Vertex AI endpoints.

Replace the placeholder sections with real API calls and integration logic as needed for a production system.