In [0]:
# 1. Define the list of pathologies (Reference Data)
pathologies_data = [
    ("Intracranial Hemorrhage (ICH)", "Critical", "CT Head"),
    ("Large Vessel Occlusion (LVO)", "Critical", "CT Head"),
    ("Pulmonary Embolism (PE)", "High", "CT Chest"),
    ("C-Spine Fracture", "Medium", "C-Spine"),
    ("Negative", "None", "Any")
]

# 2. Create a DataFrame
schema = ["pathology_name", "urgency_level", "related_modality"]
pathology_df = spark.createDataFrame(pathologies_data, schema)

# 3. Save as a reference table in Unity Catalog
catalog_name = "ofer_ohana_catalog"
schema_name = "aidoc"
table_name = "pathology_reference"

full_table_path = f"{catalog_name}.{schema_name}.{table_name}"

print(f"Creating reference table at: {full_table_path}")

(pathology_df.write
 .format("delta")
 .mode("overwrite")
 .saveAsTable(full_table_path)
)

print("Reference table created successfully.")
display(pathology_df)

In [0]:
# --- Dynamic Widget Loading ---

# 1. Read the valid pathologies from the reference table
# Note: Using collect() is safe here because reference tables are small (Dimension tables).
# DO NOT do this on big data tables (Fact tables).
rows = spark.sql(f"SELECT pathology_name FROM {full_table_path} ORDER BY pathology_name").collect()

# 2. Convert Row objects to a Python list of strings
valid_pathologies_list = [row['pathology_name'] for row in rows]

# 3. Create/Update the widget dynamically
# The first item in the list becomes the default value
dbutils.widgets.dropdown("pathology_filter", valid_pathologies_list[0], valid_pathologies_list, "Select Pathology")

# --- Using the Widget selection ---
selected_value = dbutils.widgets.get("pathology_filter")

print(f"User selected: {selected_value}")

# Apply logic...
df_logs = spark.table("ofer_ohana_catalog.aidoc.radiology_ai_logs")
display(df_logs.filter(df_logs.suspected_pathology == selected_value))

In [0]:
from pyspark.sql.functions import col, expr, rand, when, lit, current_timestamp, date_sub

# --- 1. Widget Setup (Best Practice) ---
# Allows the user to control the data volume dynamically without changing the code.
dbutils.widgets.text("row_count", "10000", "Number of Rows")

# Retrieve the value from the widget
try:
    row_count = int(dbutils.widgets.get("row_count"))
except ValueError:
    # Fallback to default if the input is invalid
    row_count = 10000

# --- 2. Define Mock Data Logic ---
# List of hospitals simulating Aidoc's clients
hospitals = ["'Sheba Medical Center'", "'Ichilov Hospital'", "'Rambam Health Care'", "'Hadassah Ein Kerem'", "'Mount Sinai NYC'"]

# SQL expression to simulate distribution of Scan Modalities
# (e.g., CT Heads are more common than MRIs)
modalities_expr = """
  CASE 
    WHEN rand() < 0.4 THEN 'CT Head'      -- 40% head CTs
    WHEN rand() < 0.7 THEN 'CT Chest'     -- 30% chest CTs
    WHEN rand() < 0.9 THEN 'C-Spine'      -- 20% spine
    ELSE 'Brain MRI'                      -- 10% MRI
  END
"""

# Business Logic: Mapping Pathologies to Modalities
# This creates realistic correlations (e.g., ICH is found in CT Head, PE in CT Chest)
pathology_logic = """
  CASE 
    WHEN modality = 'CT Head' AND rand() < 0.15 THEN 'Intracranial Hemorrhage (ICH)'
    WHEN modality = 'CT Head' AND rand() < 0.05 THEN 'Large Vessel Occlusion (LVO)'
    WHEN modality = 'CT Chest' AND rand() < 0.12 THEN 'Pulmonary Embolism (PE)'
    WHEN modality = 'C-Spine' AND rand() < 0.10 THEN 'C-Spine Fracture'
    ELSE 'Negative' 
  END
"""

# --- 3. Generate the DataFrame ---
print(f"Generating {row_count} records related to Aidoc radiology use cases...")

df = spark.range(row_count) \
  .withColumn("study_id", expr("uuid()")) \
  .withColumn("patient_id", expr("uuid()")) \
  .withColumn("hospital_name", expr(f"elt(cast(rand()*{len(hospitals)} + 1 as int), {','.join(hospitals)})")) \
  .withColumn("modality", expr(modalities_expr)) \
  .withColumn("suspected_pathology", expr(pathology_logic)) \
  .withColumn("ai_confidence_score", expr("round(rand(), 4)")) \
  .withColumn("processing_time_ms", expr("cast(rand() * 500 + 50 as int)")) \
  .withColumn("scan_timestamp", expr("date_sub(current_timestamp(), cast(rand()*30 as int))")) \
  .withColumn("is_alerted", when((col("suspected_pathology") != 'Negative') & (col("ai_confidence_score") > 0.75), True).otherwise(False))

# Injecting artificial "Noise" (Data Quality issues) for demonstration purposes
# Example: 1% of rows will have a missing Hospital Name
df_final = df.withColumn("hospital_name", when(rand() > 0.99, lit(None)).otherwise(col("hospital_name")))

# --- 4. Save to Unity Catalog ---
catalog_name = "ofer_ohana_catalog"
schema_name = "aidoc"
table_name = "radiology_ai_logs"

full_table_path = f"{catalog_name}.{schema_name}.{table_name}"

# Ensure schema exists
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}")

print(f"Saving data to table: {full_table_path}...")

(df_final.write
 .format("delta")
 .mode("overwrite") # Use 'append' if you want to add to existing data
 .saveAsTable(full_table_path)
)

print("Success! Dataset created.")
display(df_final)

In [0]:
from pyspark.sql.functions import col, avg, count, lit

# --- Widget Section ---
# DO: Define widgets at the top for visibility.
# DON'T: Rely on these working in VS Code/Cursor locally without dbutils mocking.
dbutils.widgets.dropdown("target_pathology", "Intracranial Hemorrhage (ICH)", 
                         ["Intracranial Hemorrhage (ICH)", "Pulmonary Embolism (PE)", "LVO"], 
                         "Select Pathology")

# Fetch value from widget
selected_pathology = dbutils.widgets.get("target_pathology")

# --- Logic Section ---
# Best Practice: Use PySpark for testable, modular logic
# Scenario: We want to normalize the processing time before analysis (easier in Python than complex SQL)

df = spark.table("ofer_ohana_catalog.aidoc.radiology_ai_logs")

# Filter based on the widget selection (Dynamic execution)
filtered_df = df.filter(col("suspected_pathology") == selected_pathology)

# Complex transformation (Harder to maintain in pure SQL)
result_df = (filtered_df
    .groupBy("hospital_name")
    .agg(
        count("*").alias("total_cases"),
        avg("processing_time_ms").alias("avg_processing_time"),
        avg("ai_confidence_score").alias("model_confidence")
    )
    .filter(col("total_cases") > 10) # Remove noise
    .orderBy(col("avg_processing_time").desc())
)

print(f"Analysis for: {selected_pathology}")
display(result_df)

In [0]:
%sql
-- Query: Analyze Alert Rates by Hospital and Pathology
-- Purpose: Quick dashboarding and ad-hoc analysis (Best for SQL Editor)

SELECT 
    hospital_name,
    suspected_pathology,
    COUNT(*) as total_scans,
    -- Calculate the percentage of alerts per pathology
    ROUND(SUM(CASE WHEN is_alerted = true THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) as alert_rate_pct,
    AVG(ai_confidence_score) as avg_confidence
FROM 
    ofer_ohana_catalog.aidoc.radiology_ai_logs
WHERE 
    suspected_pathology != 'Negative' -- Filter out healthy scans
GROUP BY 
    1, 2
ORDER BY 
    alert_rate_pct DESC;