In [0]:
%pip install faker

In [0]:
import pandas as pd
import random
from datetime import datetime, timedelta
import faker
from pyspark.sql.functions import expr
import json

In [0]:
# Parameters
catalog = "mzervou"
schema = "healthcare"
model_name = "databricks-meta-llama-3-70b-instruct"
start_date = datetime(2024, 1, 1)
end_date = datetime(2025, 6, 1)

In [0]:
faker = faker.Faker()

# # Load patient visit data to use as base
# df_visits = spark.read.table(f"{catalog}.{schema}.synthetic_patient_visits").toPandas()

# Helper to randomize timestamp
def random_datetime(start, end):
    return start + timedelta(seconds=random.randint(0, int((end - start).total_seconds())))



In [0]:
df_spark_visits = spark.read.table(f"{catalog}.{schema}.synthetic_patient_visits")

In [0]:
# ---------- LAB RESULTS (via LLM) ----------
df_spark_visits = spark.read.table(f"{catalog}.{schema}.synthetic_patient_visits")
lab_schema = json.dumps({
    "type": "json_schema",
    "json_schema": {
        "name": "lab_results_extraction",
        "schema": {
            "type": "object",
            "properties": {
                "lab_results": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "lab_name": {"type": "string"},
                            "lab_value": {"type": "number"},
                            "lab_unit": {"type": "string"},
                            "abnormal_flag": {"type": "string"}
                        }
                    }
                }
            },
            "required": ["lab_results"],
            "strict": True
        }
    }
})
df_lab = df_spark_visits.withColumn(
    "lab_results",
    expr(
        f"""
        ai_query(
            endpoint => '{model_name}',
            request => concat(
                'You are generating realistic lab results for a patient seen in the ', department,
                ' department on ', visit_datetime, '. Return two lab results as JSON.'
            ),
            responseFormat => '{lab_schema}'
        )
        """
    )
)
df_lab.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.lab_results_raw")

print("✅ Lab results with valid json_schema format generated and saved.")


In [0]:
# ---------- DIAGNOSES (via LLM) ----------
diagnosis_schema = json.dumps({
    "type": "json_schema",
    "json_schema": {
        "name": "diagnosis_extraction",
        "schema": {
            "type": "object",
            "properties": {
                "icd10_code": {"type": "string"},
                "diagnosis_text": {"type": "string"},
                "diagnosis_status": {"type": "string"}
            },
            "required": ["icd10_code", "diagnosis_text", "diagnosis_status"],
            "strict": True
        }
    }
})
df_diag = df_spark_visits.withColumn(
    "diagnosis",
    expr(
        f"""
        ai_query(
            endpoint => '{model_name}',
            request => concat(
                'The patient visited for ', department,
                '. Return one ICD-10 diagnosis and description in structured JSON.'
            ),
            responseFormat => '{diagnosis_schema}'
        )
        """
    )
)
df_diag.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.diagnoses_raw")

# ---------- MEDICATIONS (via LLM) ----------
med_schema = json.dumps({
    "type": "json_schema",
    "json_schema": {
        "name": "medications_extraction",
        "schema": {
            "type": "object",
            "properties": {
                "medication_name": {"type": "string"},
                "dose": {"type": "string"},
                "frequency": {"type": "string"},
                "route": {"type": "string"}
            },
            "required": ["medication_name", "dose", "frequency", "route"],
            "strict": True
        }
    }
})
df_meds = df_spark_visits.withColumn(
    "medications",
    expr(
        f"""
        ai_query(
            endpoint => '{model_name}',
            request => concat(
                'After seeing this patient, the doctor prescribed a medication. Return details as structured JSON.'
            ),
            responseFormat => '{med_schema}'
        )
        """
    )
)
df_meds.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.medications_raw")

print("✅ Structured data (diagnoses, medications) generated using ai_query with enforced schema are ccreated.")
