
# Step 1 - Data Generation

### Cells 2-20 run Synthea synthetic data generator. Skip these cells if you already have data to work with.

Synthea runs on Java Development Kit (JDK) 17, so use a cluster that has DBR 16.0 or above, as JDK 17 is the default. Check that you have JDK 17 installed:

In [0]:
%sh
java -version

In [0]:
dbutils.widgets.text(name = "catalog_name", defaultValue="", label="Catalog Name")
dbutils.widgets.text(name = "schema_name", defaultValue="", label="Schema Name")
dbutils.widgets.text(name = "destination", defaultValue="./output/", label = "Base Directory")

In [0]:
catalog_name = dbutils.widgets.get(name = "catalog_name")
schema_name = dbutils.widgets.get(name = "schema_name")
destination = dbutils.widgets.get(name = "destination")
volume_path = f"/Volumes/{catalog_name}/{schema_name}/synthetic_files_raw/"

In [0]:
try:
    # Code that may raise an exception
    dbutils.fs.ls(f"{volume_path}synthea_config.txt")
    result = "True"  # Return 0 if it works
except:
    result = "False"  # Return 1 if an exception occurs

result  # Return the result

In [0]:
dbutils.jobs.taskValues.set(key = 'result', value = result)

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS IDENTIFIER(:catalog_name);
USE CATALOG IDENTIFIER(:catalog_name)

In [0]:
%sql
create schema if not exists IDENTIFIER(:schema_name);
use schema IDENTIFIER(:schema_name);

In [0]:
%sql
create volume if not exists synthetic_files_raw;

In [0]:
# Retrieve the latest Synthea release
from urllib.request import urlretrieve
urlretrieve(
  url = "https://github.com/synthetichealth/synthea/releases/download/master-branch-latest/synthea-with-dependencies.jar"
  ,filename = f"{volume_path}synthea-with-dependencies.jar"
)

In [0]:
# Execute the Synthea JAR one time to initialize
command = f"""
cd {volume_path}
java -jar synthea-with-dependencies.jar
"""

In [0]:
# Create a Synthea configuration file and write it to the volume
config_file_text = (
f"""# synthea streaming simulation configuration file
exporter.ccda.export = false
exporter.fhir.export = false
exporter.csv.export = true
exporter.csv.folder_per_run = true
exporter.years_of_history = 5
exporter.baseDirectory = {destination}
generate.append_numbers_to_person_names = false
generate.default_population = 100
exporter.clinical_note.export = true
generate.costs.default_encounter_cost = 1250.00
""")

filename = f"{volume_path}synthea_config.txt"

with open(filename, "w") as f:
    f.write(config_file_text)

f.close()

In [0]:
def data_generator(volume_path: str = volume_path, config_file_path: str = f"{volume_path}synthea_config.txt", additional_options: str = "", verbose: bool = False):
  command = (
  f"""cd {volume_path}
  java -jar synthea-with-dependencies.jar -s 666 -cs 777 -r 20200626 -k keep.json -c {config_file_path} {additional_options}
  """)
  if verbose == True:
    print(command)
  result = subprocess.run([command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
  return result

In [0]:
import subprocess
run_results = data_generator(

   volume_path=volume_path
  ,config_file_path=f"{volume_path}synthea_config.txt"
  ,additional_options="Utah"
  ,verbose=True
)

In [0]:
source_volume_path = f"/Volumes/{catalog_name}/{schema_name}/synthetic_files_raw/"
target_volume_path = f"/Volumes/{catalog_name}/{schema_name}/landing/"

In [0]:
# create landing zone volume if not exists
spark.sql(f'CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.landing')

In [0]:
# Copy new files from synthetic_files_raw/output/csv folder to landing zone

import os

# get directories and order by file name (timestamp) in ascending order (ensure correct processing order)
directories = dbutils.fs.ls(f"{source_volume_path}/output/csv")
directories_sorted = sorted(directories)

# for each directory, get files and move them to landing
for directory in directories_sorted:
  file_path = directory[0]
  directory = directory[1].split('/')[0]
  files = spark.sql(f"LIST '{file_path}' ")
  # define file/directory to ignore
  file_exception = 'data_quality_output_data_quality_output/'  
  print(f"Copying files from directory: {directory} \n source:{file_path}  \n target:{target_volume_path}")

  # get files in given directory
  for file in files.collect():
    # create a folder for the csv based off of file name
    file_path = file[0]
    file_time = file_path.split('/')[-2]
    directory_name = file[1].split('.')[0]
    file_name = file_time + '_' + file[1].split('.')[0]
    
    # check if file exists and copy file
    dst = f"{target_volume_path}{directory_name}/{file_name}.csv"

    if os.path.exists(dst):
      print(f'File already exists, skipping file: {file_name}.csv')
    else:
      print(f'Copying file: {file_name}.csv to target: {target_volume_path}')
      dbutils.fs.cp(f"{file_path}", dst)
  print(f'Successfully copied files to target \n target: {target_volume_path}')

In [0]:
# Copy new files from synthetic_files_raw/output/notes to landing zone

source_path = f"/Volumes/{catalog_name}/{schema_name}/synthetic_files_raw/output/notes"
target_path = f"/Volumes/{catalog_name}/{schema_name}/landing/notes/"

files = [file.path for file in dbutils.fs.ls(source_path)]

for file in files:
    file_name = file.split('/')[-1]
    dst = f"{target_path}{file_name}"

    if os.path.exists(dst):
        print(f'File already exists, skipping file: {file_name}')
    else:
        print(f'Copying file: {file_name} to target: {target_path}')
        dbutils.fs.cp(file, dst, recurse=True)

print(f'Successfully copied files to target: {target_path}')

In [0]:
# Ingest the CSV files into Delta tables

# Define the base path to the landing folder
base_path = f"/Volumes/{catalog_name}/{schema_name}/landing/"

# Get all subdirectories in the base path
subdirectories = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))]

# Remove the 'notes' folder from the subdirectories list
subdirectories = [d for d in subdirectories if d != 'notes']

print(subdirectories)

# Iterate over each subdirectory and process the files
for subdir in subdirectories:
    file_path = os.path.join(base_path, subdir)
    
    # Read the files into a DataFrame
    df = spark.read.format("csv").option("header", "true").load(file_path)
    
    # Write the DataFrame to a Delta table
    table_name = f"{catalog_name}.{schema_name}.{subdir}"
    df.write.format("delta").mode("overwrite").saveAsTable(table_name)

In [0]:
# Ingest the notes files (in txt format) into Delta tables

# Define the path to the notes folder
notes_path = f"/Volumes/{catalog_name}/{schema_name}/synthetic_files_raw/output/notes/"

# Get all files in the notes folder
note_files = [f for f in os.listdir(notes_path) if os.path.isfile(os.path.join(notes_path, f))]

# Create a list to hold the file data
data = []

# Iterate over each file and read its content
for note_file in note_files:
    file_path = os.path.join(notes_path, note_file)
    with open(file_path, 'r') as file:
        file_text = file.read()
        data.append((note_file, file_text))

# Create a DataFrame from the data
notes_df = spark.createDataFrame(data, ["file_name", "file_text"])

# Write the DataFrame to a Delta table
notes_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.notes")

# Step 2 - Data Governance

### Cells 22-27 show how to mask sensitive fields so that only users in certain user groups will be able to see the information.

<img src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/hls/patient-readmission/hls-patient-readmision-flow-2.png" style="float: right; margin-left: 10px; margin-top:10px" width="650px" />

In [0]:
%sql
USE CATALOG ${catalog_name};
USE SCHEMA ${schema_name};
SHOW TABLES;

In [0]:
%sql
-- Let's grant our ANALYSTS a SELECT permission:
-- Note: make sure you created an analysts and dataengineers group first.
GRANT SELECT ON TABLE drug_exposure TO `analysts`;
GRANT SELECT ON TABLE condition_occurrence TO `analysts`;
GRANT SELECT ON TABLE patients TO `analysts`;

-- We'll grant an extra MODIFY to our Data Engineer
-- GRANT SELECT, MODIFY ON SCHEMA dbdemos_hls_readmission TO `dataengineers`;

In [0]:
%sql
CREATE OR REPLACE TABLE protected_patients AS SELECT * FROM patients;

In [0]:
%sql
-- hls_admin group will have access to all data, all other users will see a masked information.
CREATE OR REPLACE FUNCTION simple_mask(column_value STRING)
   RETURN IF(is_account_group_member('hls_admin'), column_value, "****");
   
-- ALTER FUNCTION simple_mask OWNER TO `account users`; -- grant access to all user to the function for the demo - don't do it in production

-- Mask all PII information
ALTER TABLE protected_patients ALTER COLUMN FIRST SET MASK simple_mask;
ALTER TABLE protected_patients ALTER COLUMN LAST SET MASK simple_mask;
ALTER TABLE protected_patients ALTER COLUMN PASSPORT SET MASK simple_mask;
ALTER TABLE protected_patients ALTER COLUMN DRIVERS SET MASK simple_mask;
ALTER TABLE protected_patients ALTER COLUMN SSN SET MASK simple_mask;
ALTER TABLE protected_patients ALTER COLUMN ADDRESS SET MASK simple_mask;

SELECT * FROM protected_patients

In [0]:
%sql
CREATE SHARE IF NOT EXISTS mcutini_diabetes_readmissions_share
  COMMENT 'Share the information in diabetes_readmissions.';
 
-- For the demo we'll grant ownership to all users. Typical deployments wouls have admin groups or similar.
ALTER SHARE mcutini_diabetes_readmissions_share OWNER TO `account users`;

-- Simply add the tables you want to share to your SHARE:
-- ALTER SHARE dbdemos_patient_readmission_visits ADD TABLE patients;

In [0]:
%sql
DESCRIBE SHARE mcutini_diabetes_readmissions_share;

# Step 3 - Data Analysis

### Cells 29-35 show the experience of analyzing your data using Python and SQL.

In [0]:
%sql 
select * from patients

In [0]:
%sql
select GENDER, ceil(months_between(current_date(),BIRTHDATE)/12/5)*5 as age, count(*) as count from patients group by GENDER, age order by age
-- Can use buildin visualization (Area: Key: age, group: gender_source_value, Values: count)

In [0]:
import plotly.express as px
px.area(_sqldf.toPandas(), x="age", y="count", color="GENDER", line_group="GENDER")

In [0]:
#We can also leverage pure Python to access data
from pyspark.sql.functions import col, desc
df = spark.table("patients").join(spark.table("conditions"), col("Id")==col("PATIENT")) \
          .groupBy(['GENDER', 'conditions.DESCRIPTION']).count() \
          .orderBy(desc('count')).limit(20).toPandas()
#And use our usual plot libraries
px.bar(df, x="DESCRIPTION", y="count", color="GENDER", barmode="group")

## Cohort Definition

Let's define a cohort that we can do analysis on.

In [0]:
%sql
CREATE OR REPLACE TABLE cohort (
  id INT,
  name STRING,
  patient STRING,
  cohort_start_date DATE,
  cohort_end_date DATE
);
ALTER TABLE cohort OWNER TO `account users`;

In [0]:
import random
from pyspark.sql.functions import lit
def create_save_cohort(name, condition_codes = []):
  cohort1 = (spark.sql('select patient, to_date(start) as cohort_start_date, to_date(stop) as cohort_end_date from conditions')
                 .withColumn('id', lit(random.randint(999999, 99999999)))
                 .withColumn('name', lit(name)))
  if len(condition_codes)> 0:
    cohort1 = cohort1.where(col('CODE').isin(condition_codes))
  cohort1.write.mode("append").saveAsTable('cohort')

#Create cohorts based on patient condition (for ex: 840539006 is COVID)
create_save_cohort('COVID-19-cohort', [840539006])
create_save_cohort('heart-condition-cohort', [1505002, 32485007, 305351004, 76464004])
create_save_cohort('all_patients')

# Step 4 - Data Science and Machine Learning

### Cells 38-50 show how do to data science on your data.


<img src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/hls/patient-readmission/hls-patient-readmision-flow-4.png" style="float: right; margin-left: 30px; margin-top:10px" width="650px" />

We have cleaned and secured our data. We have also created cohorts of patients to analyze. We can now predict 30-day readmissions

In [0]:
%pip install mlflow==2.19.0
dbutils.library.restartPython()

## Feature engineering

In [0]:
import pyspark.sql.functions as F
# Let's create our label: we'll predict the  30 days readmission risk
windowSpec = Window.partitionBy("PATIENT").orderBy("START")
labels = spark.table('encounters').select("PATIENT", "Id", "START", "STOP") \
              .withColumn('30_DAY_READMISSION', F.when(F.col('START').cast('long') - F.lag(F.col('STOP')).over(windowSpec).cast('long') < 30*24*60*60, 1).otherwise(0))
display(labels)

In [0]:
import pyspark.pandas as ps

# Define Patient Features logic
def compute_pat_features(data):
  data = data.pandas_api()
  data = ps.get_dummies(data, columns=['MARITAL', 'RACE', 'ETHNICITY', 'GENDER'],dtype = 'int64').to_spark()
  return data

In [0]:
cohort_name = 'all_patients' #or could be 'COVID-19-cohort'
cohort = spark.sql(f"SELECT p.* FROM cohort c INNER JOIN patients p on c.patient=p.id WHERE c.name='{cohort_name}'").dropDuplicates(["id"])
cohort_features_df = compute_pat_features(cohort)
cohort_features_df.display()

In [0]:
from pyspark.sql.functions import col

def compute_enc_features(data):
  data = data.dropDuplicates(["Id"])
  data = data.withColumn('enc_length', F.unix_timestamp(col('stop'))- F.unix_timestamp(col('start')))
  data = data.pandas_api()
#   return data
  data = ps.get_dummies(data, columns=['ENCOUNTERCLASS'],dtype = 'int64').to_spark()
  
  return (
    data
    .select(
      col('Id').alias('ENCOUNTER_ID'),
      'BASE_ENCOUNTER_COST',
      'TOTAL_CLAIM_COST',
      'PAYER_COVERAGE',
      'enc_length',
      'ENCOUNTERCLASS_ambulatory',
      'ENCOUNTERCLASS_emergency',
      'ENCOUNTERCLASS_hospice',
      'ENCOUNTERCLASS_inpatient',
      'ENCOUNTERCLASS_outpatient',
      'ENCOUNTERCLASS_wellness',
    )
  )

enc_features_df = compute_enc_features(spark.table('encounters'))
display(enc_features_df)

In [0]:
enc_features_df = compute_enc_features(spark.table('encounters'))
training_dataset = cohort_features_df.join(labels, [labels.PATIENT==cohort_features_df.Id], "inner") \
                                     .join(enc_features_df, [labels.Id==enc_features_df.ENCOUNTER_ID], "inner") \
                                     .drop("Id", "_rescued_data", "SSN", "DRIVERS", "PASSPORT", "FIRST", "LAST", "ADDRESS", "BIRTHPLACE")
### Adding extra feature such as patient age at encounter
training_dataset = training_dataset.withColumnRenamed("PATIENT", "patient_id") \
                                   .withColumn("age_at_encounter", ((F.datediff(col('START'), col('BIRTHDATE'))) / 365.25))

training_dataset.write.mode('overwrite').saveAsTable("training_dataset")
display(spark.table("training_dataset"))

## AutoML

In [0]:
feature_names = ['MARITAL_M', 'MARITAL_S', 'RACE_asian', 'RACE_black', 'RACE_hawaiian', 'RACE_other', 'RACE_white', 'ETHNICITY_hispanic', 'ETHNICITY_nonhispanic', 'GENDER_F', 'GENDER_M', 'INCOME'] \
              + ['BASE_ENCOUNTER_COST', 'TOTAL_CLAIM_COST', 'PAYER_COVERAGE', 'enc_length', 'ENCOUNTERCLASS_ambulatory', 'ENCOUNTERCLASS_emergency', 'ENCOUNTERCLASS_hospice', 'ENCOUNTERCLASS_inpatient', 'ENCOUNTERCLASS_outpatient', 'ENCOUNTERCLASS_wellness'] \
              + ['age_at_encounter'] \
              + ['30_DAY_READMISSION']

In [0]:
import mlflow
from datetime import datetime
model_name = "mcutini_diabetes_readmissions"
xp_path = "/Shared/dbdemos/experiments/lakehouse-patient-admission"
xp_name = f"automl_churn_{datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}"
try:
    from databricks import automl
    automl_run = automl.classify(
        experiment_name = xp_name,
        experiment_dir = xp_path,
        dataset = training_dataset.select(feature_names),
        target_col = "30_DAY_READMISSION",
        primary_metric="roc_auc",
        timeout_minutes = 10
    )
    #Make sure all users can access dbdemos shared experiment
    DBDemos.set_experiment_permission(f"{xp_path}/{xp_name}")
except Exception as e:
    if "cannot import name 'automl'" in str(e):
        # Note: cannot import name 'automl' from 'databricks' likely means you're using serverless. Dbdemos doesn't support autoML serverless API - this will be improved soon.
        # Adding a temporary workaround to make sure it works well for now - ignore this for classic run
        automl_run = DBDemos.create_mockup_automl_run(f"{xp_path}/{xp_name}", training_dataset.select(feature_names).toPandas(), model_name = model_name, target_col = "30_DAY_READMISSION")
    else:
        raise e