In [0]:
#%pip install -U llama-index llama-index-llms-databricks mlflow fhir.resources
#%restart_python

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from llama_index.core.llms import ChatMessage
from llama_index.llms.databricks import Databricks
from databricks.sdk import WorkspaceClient
import random
import json
from pprint import pprint
##  import #pprint
from library import get_all_fhir_resource_info_with_descriptions
import json
##  import #pprint
from llama_index.core.llms import ChatMessage

# Assuming 'llm' is already initialized (e.g., llm = Databricks(endpoint_name="...") )
# You would import your function like this:
from library import get_all_fhir_resource_info_with_descriptions
import json
##  import #pprint

# Import specific FHIR resource classes from fhir.resources library
# Ensure you have installed it: pip install fhir.resources
from fhir.resources.location import Location
from fhir.resources.organization import Organization
from fhir.resources.address import Address
from fhir.resources.codeableconcept import CodeableConcept
from fhir.resources.coding import Coding


w = WorkspaceClient()
tmp_token = w.tokens.create(comment="for model serving", lifetime_seconds=14400)


llm = Databricks(
    model="databricks-llama-4-maverick",
    api_key=tmp_token.token_value,
    api_base=f"{w.config.host}/serving-endpoints/"
)

# Completion
#llm.complete("Hello, world")

In [0]:
# --- Configuration (can be moved to a config file in a real project) ---
endpoint_loc = "workspace.default.fhir_endpoints"
us_states = ["alabama", "alaska", "arizona", "arkansas", "california", "colorado", "connecticut", "delaware",
             "florida", "georgia", "hawaii", "idaho", "illinois", "indiana", "iowa", "kansas", "kentucky",
             "louisiana", "maine", "maryland", "massachusetts", "michigan", "minnesota", "mississippi",
             "missouri", "montana", "nebraska", "nevada", "new hampshire", "new jersey", "new mexico",
             "new york", "north carolina", "north dakota", "ohio", "oklahoma", "oregon", "pennsylvania",
             "rhode island", "south carolina", "south dakota", "tennessee", "texas", "utah", "vermont",
             "virginia", "washington", "west virginia", "wisconsin", "wyoming"]

target_column = "api_information_source_name" # Column containing the unstructured text

# --- Step 1: Get data and prepare for LLM ---
# Load data from the specified endpoint table
df_ep = spark.table(endpoint_loc).filter(
    (F.col(target_column).isNotNull()) &
    (F.col(target_column) != '200') & # Filter out non-meaningful entries
    (F.lower(F.col(target_column)).rlike('|'.join(us_states))) # Ensure it contains a US state
)

# Convert to Pandas DataFrame for easier sampling (for demonstration)
# For larger datasets, consider using Spark's sample or UDFs for LLM calls directly
df_ep_pd = df_ep.toPandas()
df_ep_pd.sample(5)

Unnamed: 0,url,api_information_source_name,created_at,updated,list_source,certified_api_developer_name,capability_fhir_version,format,http_response,http_response_time_second,smart_http_response,errors,cap_stat_exists,kind,requested_fhir_version,is_chpl
540,https://fhir-usa.unify.chbase.com/org/wyoming-...,WYOMING COUNTY COMMUNITY,24/5/2025 6:40,9/6/2025 8:24,https://fhir-usa.unify.chbase.com/.well-known/...,"TruBridge, Inc.",4.0.1,"application/fhir+json,application/fhir+xml",200.0,0.3144,200.0,,true*,capability,4.0,True
1322,https://haiku.sparrow.org/fhir-prd/api/FHIR/R4/,University of Michigan Health-Sparrow,10/5/2025 23:35,9/6/2025 3:16,https://open.epic.com/Endpoints/Brands,Epic Systems Corporation,4.0.1,"xml,json",200.0,0.1819,200.0,,TRUE,instance,,True
1725,https://api.platform.athenahealth.com/21519/br...,Texas Online Primary Care,24/5/2025 13:26,9/6/2025 3:25,https://service-base-urls.api.fhir.athena.io/a...,"athenahealth, Inc.",4.0.1,"json,application/json,application/json+fhir,ap...",200.0,0.0862,200.0,,TRUE,instance,,True
1069,https://api.platform.athenahealth.com/21499/br...,Texas Gynecology,24/5/2025 5:53,8/6/2025 23:52,https://service-base-urls.api.fhir.athena.io/a...,"athenahealth, Inc.",4.0.1,"json,application/json,application/json+fhir,ap...",200.0,0.0892,200.0,,TRUE,instance,,True
1112,https://fhir-myrecord.cerner.com/r4/329afbd4-f...,New York Comprehensive Care P.C.,11/5/2025 8:34,9/6/2025 4:58,https://raw.githubusercontent.com/oracle-sampl...,Cerner Corporation,4.0.1,"json,application/fhir+json",200.0,0.5025,200.0,,TRUE,instance,,True


In [0]:

# Select a random entry for processing
if not df_ep_pd.empty:
    random_entry_name = df_ep_pd[target_column].sample(n=1).iloc[0]
else:
    random_entry_name = "No matching entries found in the table." # Handle empty DataFrame case


print(f"**Processing the following unstructured text:** '{random_entry_name}'\n")

# Initialize the LLM (ensure your Databricks LLM is configured)
# This assumes 'llm' is already initialized as a Databricks LLM instance as in your original code.
# For example: llm = Databricks(endpoint_name="databricks-mixtral-8x7b-instruct") # or your specific endpoint
# Make sure your Databricks WorkspaceClient is configured for authentication.
# w = WorkspaceClient() # this usually handles authentication if you're running in a Databricks notebook

# --- System Message: Generalizable for any FHIR resource (that the LLM knows) ---
# We make the system message flexible enough for the LLM to choose the best FHIR resource type
# and extract relevant fields based on the input text.
system_message = (
    "You are an expert at extracting FHIR data from unstructured healthcare text. "
    "Your goal is to parse the provided text and identify information relevant to a FHIR resource. "
    "You should determine the most appropriate FHIR resource type (e.g., Location, Practitioner, Organization, Patient) "
    "based on the input text. "
    "Then, extract key fields for that resource, such as 'name', 'address' (with 'city', 'state', 'country'), 'identifier', 'telecom', etc. "
    "For 'address', always include 'city', 'state', and 'country'. "
    "Infer and guess data based on common knowledge (e.g., 'California' for 'Los Angeles', 'USA' for US states). "
    "Return the extracted information as a JSON array, where each element is a FHIR-compliant JSON object. "
    "Use empty strings or nulls for fields if the information is not present or cannot be reasonably inferred."
)

# --- User Prompt: Focus on the specific extraction task ---
# The prompt is simplified to just ask for the extraction based on the input text.
user_prompt = f"Extract FHIR resource information from this text:\n'{random_entry_name}'"

messages = [
    ChatMessage(role="system", content=system_message),
    ChatMessage(role="user", content=user_prompt),
]

# Assuming 'llm' is already defined and configured from your environment
output = llm.chat(messages)

# --- Process LLM Output (assuming JSON response) ---
# Your existing code for parsing the LLM's response
try:
    # LLMs sometimes wrap JSON in markdown blocks, so we need to extract it
    raw_llm_output = output.message.content
    # Simple heuristic to find the JSON block
    if '```json' in raw_llm_output and '```' in raw_llm_output:
        json_start = raw_llm_output.find('```json') + len('```json')
        json_end = raw_llm_output.rfind('```')
        json_string = raw_llm_output[json_start:json_end].strip()
    else:
        json_string = raw_llm_output.strip() # Assume it's just JSON if no markdown

    extracted_data_step1 = json.loads(json_string)
    #print("\n--- LLM's Raw Output (Extracted JSON) ---")
    #pprint(extracted_data_step1)


except Exception as e:

    print(f"\nAn unexpected error occurred: {e}")
    print("\n--- Raw LLM Output (For Debugging) ---")
    print(output.message.content)
# --- Assume 'extracted_data_step1' is available from Step 1's execution ---
# For demonstration in this isolated block, we'll use a sample.
# In your actual pipeline, this variable would be populated by the previous step.
#extracted_data_step1 = extracted_data

#example
# # [
#   {
#     "resourceType": "Organization", # This would come from LLM's initial extraction in Step 1
#     "name": "Nevada Medical and Pain Institute",
#     "address": {
#       "city": "", # Example of a missing city
#       "state": "Nevada",
#       "country": "USA"
#     },
#     "identifier": None,
#     "telecom": None
#   }
# ]

# --- Build FHIR Context for the LLM ---

# Get the structured FHIR resource descriptions dynamically using your imported function
#fhir_resource_descriptions = get_all_fhir_resource_info_with_descriptions()

# Define the full table name
table_name = "workspace.default.fhir_resource_descriptions"

# 1. Read the data from the Delta table into a Spark DataFrame
#print(f"Reading data from table: `{table_name}`...")
fhir_df = spark.read.format("delta").table(table_name)

# 2. Convert the Spark DataFrame to a Pandas DataFrame
# This is useful if you want to perform local Python operations
# or work with the data outside of Spark's distributed environment.
#print("\nConverting Spark DataFrame to Pandas DataFrame...")
fhir_pandas_df = fhir_df.toPandas()

# 3. Convert the Pandas DataFrame to a list of dictionaries
# Each dictionary will represent a row in your table.
#print("\nConverting Pandas DataFrame to list of dictionaries...")
fhir_resource_descriptions = fhir_pandas_df.to_dict(orient='records')

# Format these descriptions into a readable string for the LLM's prompt
fhir_context_string = (
    "Refer to the following list of FHIR Resource Types and their short descriptions "
    "to help you categorize the extracted information. Pay attention to the purpose "
    "of each resource to make the best classification:\n\n"
)
for res_info in fhir_resource_descriptions:
    fhir_context_string += (
        f"- **{res_info['fhir_resource_name']}**: {res_info['fhir_resource_description']}\n"
    )
fhir_context_string += "\n"

# --- Define LLM System Message and User Prompt ---

system_message_step2_final = (
    "You are an expert AI assistant specializing in analyzing structured healthcare data. "
    "Your primary goal is to review the provided JSON data (extracted from an unstructured source) "
    "and categorize its content into meaningful, FHIR-relevant buckets. "
    "You MUST use the provided list of FHIR Resource Types and their descriptions as a precise guide "
    "to identify the most appropriate FHIR resource type(s) for the extracted information. "
    "For each categorized piece of information, describe what it represents, its value, "
    "and explicitly suggest the most suitable FHIR resource type (e.g., 'Organization', 'Location', 'Practitioner', etc.) "
    "and specific fields that would hold this information. "
    "Also, infer any potential relationships or 'edges' to other FHIR resources "
    "(e.g., an 'Organization' operates at a 'Location'). "
    "Do NOT generate JSON output. Instead, provide a clear, natural language summary or a bulleted list of your findings."
)

# Combine the extracted data and the dynamically generated FHIR context for the user prompt
if 'extracted_data_step1' in locals() and extracted_data_step1:
    user_prompt_step2_final = (
        "Here is the extracted information from Step 1:\n"
        f"```json\n{json.dumps(extracted_data_step1, indent=2)}\n```\n\n"
        f"{fhir_context_string}" # Insert the dynamically generated FHIR context
        "Please analyze the extracted information and categorize it into meaningful buckets, "
        "explicitly mapping it to the most relevant FHIR resource types and their key fields from the examples. "
        "Also, note any potential relationships or 'edges' between these categorized pieces of information "
        "that would be useful for a FHIR database."
    )
else:
    user_prompt_step2_final = "No structured data was extracted in Step 1 to analyze for categorization."

# --- Construct and Execute LLM Chat ---

messages_step2_final = [
    ChatMessage(role="system", content=system_message_step2_final),
    ChatMessage(role="user", content=user_prompt_step2_final),
]

#print(f"\n**--- Step 2: LLM's Stream of Consciousness with Dynamic FHIR Context ---**")

# Call the LLM with the new messages for Step 2
# Remember to have your 'llm' object initialized before running this part.
# Example: llm = Databricks(endpoint_name="databricks-mixtral-8x7b-instruct")
# output_step2_final = llm.chat(messages_step2_final)
# #print(output_step2_final.message.content)

# For demonstration without an active LLM connection, we'll #print the full prompt:
#print("\n--- Full User Prompt for LLM (for review) ---")
#print(user_prompt_step2_final)


# Call the LLM with the messages for Step 2
output_step2 = llm.chat(messages_step2_final)

# #print the raw content for "stream of consciousness"
#print(output_step2.message.content)

# Assuming 'llm' is already initialized (e.g., llm = Databricks(endpoint_name="...") )

# --- Assume inputs from previous steps are available ---
# extracted_data_step1: The raw structured JSON from Step 1 (e.g., list of dicts)
# output_step2: The ChatCompletionResponse object from Step 2, containing the natural language summary.

# For demonstration purposes, let's create sample inputs:


# Simulate the output from Step 2 (natural language stream of consciousness)
# In your actual pipeline, this would be output_step2.message.content
sample_output_step2_content = """
Based on the extracted information:

-   **Nevada Medical and Pain Institute**: This strongly suggests an **Organization** resource.
    -   **Name**: "Nevada Medical and Pain Institute" (maps to `Organization.name`)
    -   **Address**: Contains 'Nevada' (State, maps to `Organization.address.state`) and 'USA' (Country, maps to `Organization.address.country`). The city is not explicitly provided, which aligns with `Organization.address.city` being potentially absent.
    -   **Identifier**: Not present (maps to `Organization.identifier` as null).
    -   **Telecom**: Not present (maps to `Organization.telecom` as null).
    -   **Reasoning**: The name clearly indicates a medical institution, which is best represented as a FHIR Organization.
    -   **Potential Edge**: This Organization would likely be associated with one or more **Location** resources.
"""

# --- Step 3: Extract FHIR Resource, Reasoning, and Relevant Data ---

system_message_step3 = (
    "You are an expert AI assistant specialized in converting categorized healthcare data "
    "into a structured JSON format that directly informs FHIR resource creation. "
    "You will be given two inputs: "
    "1. The initial JSON data extracted from an unstructured source (Step 1 output). "
    "2. A natural language summary/categorization of that data, informed by FHIR schemas (Step 2 output). "
    "Your task is to identify each major FHIR resource suggested in the Step 2 output "
    "(ignoring inferred relationships/edges for now) and provide the following for each: "
    "- **`fhir_resource_type`**: This key **MUST** contain the specific FHIR resource type "
    " (e.g., 'Organization', 'Location', 'Practitioner') identified by Step 2. "
    "- **`relevant_data`**: A JSON object containing the key-value pairs of data from the Step 1 output that are relevant to this FHIR resource. Only include fields that directly map to standard FHIR properties of this resource type. If a field from Step 1's output is not a direct FHIR property (e.g., a custom field), omit it. "
    "- **`reasoning`**: A concise explanation (1-2 sentences) of why this resource type is chosen and how the data maps. "
    "**CRITICAL**: Your output MUST be ONLY a JSON array, with absolutely no other text, commentary, "
    "markdown fences (```json), or other characters before or after the JSON. "
    "The JSON should be structured as: `[{'fhir_resource_type': '...', 'relevant_data': {...}, 'reasoning': '...'}]`."
)

# Combine inputs for the LLM's user prompt
user_prompt_step3 = (
    "Here is the initial extracted JSON data from Step 1:\n"
    f"```json\n{json.dumps(extracted_data_step1, indent=2)}\n```\n\n"
    "Here is the natural language categorization and reasoning from Step 2:\n"
    f"```\n{sample_output_step2_content}\n```\n\n"
    "Please extract the core FHIR resources from these inputs, providing the resource type, "
    "the relevant data mapped from Step 1's JSON, and the reasoning for each, in a JSON array format. "
    "Remember, your response MUST be ONLY the JSON, and the resource type key MUST be `fhir_resource_type`."
)

messages_step3 = [
    ChatMessage(role="system", content=system_message_step3),
    ChatMessage(role="user", content=user_prompt_step3),
]

#print(f"\n**--- Step 3: LLM Extracting FHIR Resource, Reasoning, and Relevant Data ---**")

# Call the LLM with the new messages for Step 3
output_step3 = llm.chat(messages_step3)
raw_llm_output_step3 = output_step3.message.content

# For demonstration without an active LLM connection, let's use a simulated LLM output
# that reflects the desired pure JSON.


# --- Robust JSON Parsing Logic ---
extracted_resources_for_step4 = None
try:
    # 1. Try to find JSON within markdown fences (most reliable if LLM uses them)
    json_string = ""
    if '```json' in raw_llm_output_step3 and '```' in raw_llm_output_step3:
        json_start = raw_llm_output_step3.find('```json') + len('```json')
        json_end = raw_llm_output_step3.rfind('```')
        json_string = raw_llm_output_step3[json_start:json_end].strip()
    else:
        # 2. Fallback: Find the first '[' and the last ']' for an array, or '{' and '}' for an object
        # This handles cases where LLM might omit markdown but output clean JSON
        first_char = raw_llm_output_step3.find('[')
        last_char = raw_llm_output_step3.rfind(']')

        if first_char == -1 or last_char == -1 or last_char < first_char:
            # Fallback for single object if not an array
            first_char = raw_llm_output_step3.find('{')
            last_char = raw_llm_output_step3.rfind('}')

        if first_char != -1 and last_char != -1 and last_char > first_char:
            json_string = raw_llm_output_step3[first_char : last_char + 1].strip()
        else:
            json_string = raw_llm_output_step3.strip() # As a last resort, try parsing the whole thing

    extracted_resources_for_step4 = json.loads(json_string)
    #print("\n--- Step 3 LLM's Clean JSON Output (for Step 4) ---")
    #pprint(extracted_resources_for_step4)


except Exception as e:
    print(f"\nAn unexpected error occurred during parsing: {e}")
    print(f"Full raw LLM output:\n---\n{raw_llm_output_step3}\n---")


# This 'extracted_resources_for_step4' is the list of dictionaries
# that will be the input for your next step (Step 4: building FHIR Entry).


# Assuming 'llm' is already initialized (e.g., llm = Databricks(endpoint_name="...") )
# Assuming 'spark' is initialized for schema_df lookup

# --- Inputs from previous steps (assumed to be available) ---
# extracted_resources_for_step4: List of dictionaries from Step 3's output.
#   Each dict should contain 'fhir_resource_type', 'relevant_data', and 'reasoning'.
# schema_df: Pandas DataFrame loaded from "workspace.default.fhir_definitions_table_2".
#   It should have 'name' and 'schema' columns.

final_fhir_json_outputs = []

#print("\n--- Step 4: Generating FHIR HL7 JSONs via LLM ---")

#!!!
schema_df = spark.table("workspace.default.fhir_definitions_table_2").toPandas()

# Iterate through each resource identified in Step 3's output
for resource_item in extracted_resources_for_step4:
    print("Processing For FHIR TABULATION: ")
    pprint(resource_item)
    fhir_type_str = resource_item["fhir_resource_type"]
    relevant_data = json.dumps(resource_item)
    reasoning_from_step3 = json.dumps(resource_item)

    #print(f"\nProcessing to generate FHIR '{fhir_type_str}' resource...")

    # Retrieve the specific schema for the resource type from schema_df
    matching_schema_row = schema_df[schema_df['name'].str.lower() == fhir_type_str.lower()]
    
    if matching_schema_row.empty:
        #print(f"WARNING: No schema found in schema_df for resource type '{fhir_type_str}'. Cannot provide schema context for LLM. Skipping this resource.")
        continue # Skip to the next resource if schema is not found

    retrieved_schema_value = matching_schema_row['schema'].iloc[0]

    # --- LLM Prompt for Step 4 ---
    system_message_step4 = (
        "You are an expert AI assistant specialized in generating FHIR HL7 compliant JSON. "
        "Your task is to produce a single FHIR resource JSON object based on the provided data and schema. "
        "Adhere strictly to the given FHIR JSON Schema. "
        "Ensure all data from 'relevant_data' is mapped to the correct FHIR fields. "
        "If a field from 'relevant_data' maps to a FHIR property that can accept null or an empty string/array, "
        "and the value is missing or empty, it is acceptable to include it as `null` or `[]` (empty array/object) if appropriate for the FHIR type. "
        "Do NOT include any extra fields that are not part of the FHIR schema. "
        "**CRITICAL**: Your output MUST be ONLY a single FHIR JSON object, with absolutely no other text, "
        "commentary, markdown fences (```json), or other characters before or after the JSON."
    )

    user_prompt_step4 = (
        f"Generate a FHIR {fhir_type_str} resource in JSON format.\n\n"
        "Here is the data relevant to this resource:\n"
        f"```json\n{json.dumps(relevant_data, indent=2)}\n```\n\n"
        "Here is the FHIR JSON Schema for a "
        f"{fhir_type_str} resource. Use this schema strictly for structure and data types:\n"
        f"```json\n{json.dumps(json.loads(retrieved_schema_value), indent=2)}\n```\n\n"
        "Please provide ONLY the FHIR JSON. Remember that nulls and empty strings/arrays are acceptable for missing data where allowed by FHIR."
    )

    messages_step4 = [
        ChatMessage(role="system", content=system_message_step4),
        ChatMessage(role="user", content=user_prompt_step4),
    ]

    #print(f"  Requesting LLM to generate FHIR '{fhir_type_str}' JSON...")
    
    # --- Call the LLM ---
    output_step4 = llm.chat(messages_step4)
    raw_llm_output_step4 = output_step4.message.content

    # --- Robust JSON Parsing Logic ---
    generated_fhir_json = None
    try:
        json_string = ""
        # 1. Try to find JSON within markdown fences first
        if '```json' in raw_llm_output_step4 and '```' in raw_llm_output_step4:
            json_start = raw_llm_output_step4.find('```json') + len('```json')
            json_end = raw_llm_output_step4.rfind('```')
            json_string = raw_llm_output_step4[json_start:json_end].strip()
        else:
            # 2. Fallback: find the first '{' and the last '}'
            first_brace = raw_llm_output_step4.find('{')
            last_brace = raw_llm_output_step4.rfind('}')
            if first_brace != -1 and last_brace != -1 and last_brace > first_brace:
                json_string = raw_llm_output_step4[first_brace : last_brace + 1].strip()
            else:
                json_string = raw_llm_output_step4.strip() # Last resort

        generated_fhir_json = json.loads(json_string)
        final_fhir_json_outputs.append(generated_fhir_json)

        #print(f"  Successfully generated FHIR '{fhir_type_str}' JSON:")
        #pprint(generated_fhir_json)
        #print("=" * 80) # Separator for clarity

    #except json.JSONDecodeError as e:
        #print(f"  ERROR: Failed to parse LLM's JSON output for '{fhir_type_str}': {e}")
        #print(f"  Attempted to parse:\n---\n{json_string}\n---")
        #print(f"  Full raw LLM output:\n---\n{raw_llm_output_step4}\n---")
        #print("-" * 60)
    except Exception as e:
        print(f"  An unexpected error occurred for '{fhir_type_str}': {e}")
        print(f"  Full raw LLM output:\n---\n{raw_llm_output_step4}\n---")
        print("-" * 60)

if not final_fhir_json_outputs:
    print("\nNo FHIR resources were successfully generated in Step 4.")
else:
    print("\n--- Summary of All Generated FHIR HL7 JSON Outputs ---")
    for idx, fhir_json in enumerate(final_fhir_json_outputs):
        print(f"\nResource {idx + 1}:")
        pprint(fhir_json)
        print("-" * 60)

**Processing the following unstructured text:** 'WESTERN ARIZONA REGIONAL MEDICAL CENTER'

Processing For FHIR TABULATION: 
{'fhir_resource_type': 'Organization',
 'reasoning': 'The provided data includes a name and address details typical '
              'of a medical institution, best represented as a FHIR '
              'Organization resource.',
 'relevant_data': {'address': [{'city': '',
                                'country': 'USA',
                                'state': 'Arizona'}],
                   'identifier': None,
                   'name': 'WESTERN ARIZONA REGIONAL MEDICAL CENTER',
                   'telecom': None}}

--- Summary of All Generated FHIR HL7 JSON Outputs ---

Resource 1:
{'active': None,
 'address': [{'city': '', 'country': 'USA', 'state': 'Arizona'}],
 'alias': [],
 'contact': [],
 'endpoint': [],
 'identifier': [],
 'name': 'WESTERN ARIZONA REGIONAL MEDICAL CENTER',
 'partOf': None,
 'qualification': [],
 'resourceType': 'Organization',
 'telecom': 