# Notes

- I used databricks to code out the assignment since it has some pre-installed packages for accessing data on s3

# Get Data

In [None]:
from pyspark.sql.functions import to_date, date_format, col, last_day

s3_bucket_name = 'waymark-assignment'
file_name_1 = 'patient_id_month_year.csv'
file_name_2 = 'outpatient_visits_file.csv'
patient_id_month_year = f's3://{s3_bucket_name}/{file_name_1}'
outpatient_visits_file = f's3://{s3_bucket_name}/{file_name_2}'

# Set up AWS credentials, removed keys for github upload
spark.conf.set('spark.hadoop.fs.s3.access.key', 'aws_access_key_id')
spark.conf.set('spark.hadoop.fs.s3.secret.key', 'aws_secret_access_key')

# Load enrollment data
enroll = spark.read.options(header='True').csv(patient_id_month_year)
# Fix date formatting
enroll = enroll.withColumn("enrollment_start_date", date_format(to_date(col("month_year"), "M/d/yy"), "yyyy-MM-dd"))
enroll = enroll.withColumn("enrollment_end_date", last_day(date_format(to_date(col("month_year"), "M/d/yy"), "yyyy-MM-dd")))

# Load outpatient visits
op_visits = spark.read.options(header='True').csv(outpatient_visits_file)
# Fix date formatting
op_visits = op_visits.withColumn("visit_date", date_format(to_date(col("date"), "M/d/yy"), "yyyy-MM-dd"))

# Enrollment Span

In [None]:
# Viewing data to make sure the date transformations are behaving the way I expect

enroll.show()

+----------+----------+----+---------------------+-------------------+
|patient_id|month_year| _c2|enrollment_start_date|enrollment_end_date|
+----------+----------+----+---------------------+-------------------+
|    ID0001|    1/1/23|null|           2023-01-01|         2023-01-31|
|    ID0001|    2/1/23|null|           2023-02-01|         2023-02-28|
|    ID0001|    3/1/23|null|           2023-03-01|         2023-03-31|
|    ID0001|    6/1/23|null|           2023-06-01|         2023-06-30|
|    ID0001|    8/1/23|null|           2023-08-01|         2023-08-31|
|    ID0001|   11/1/23|null|           2023-11-01|         2023-11-30|
|    ID0001|   12/1/23|null|           2023-12-01|         2023-12-31|
|    ID0002|    2/1/23|null|           2023-02-01|         2023-02-28|
|    ID0002|    3/1/23|null|           2023-03-01|         2023-03-31|
|    ID0002|    4/1/23|null|           2023-04-01|         2023-04-30|
|    ID0002|    7/1/23|null|           2023-07-01|         2023-07-31|
|    I

### Check for Nulls/Blanks

#### Nulls

In [None]:
from pyspark.sql.functions import col

enroll.filter(col("patient_id").isNull()).count()

Out[232]: 1039817

In [None]:
from pyspark.sql.functions import col

enroll.filter(col("month_year").isNull()).count()

Out[233]: 1039817

In [None]:
from pyspark.sql.functions import col

enroll.filter(col("enrollment_start_date").isNull()).count()

Out[234]: 1039817

In [None]:
from pyspark.sql.functions import col

enroll.filter(col("enrollment_end_date").isNull()).count()

Out[235]: 1039817

#### Blanks

In [None]:
from pyspark.sql.functions import col

enroll.filter(col("patient_id") == "").count()

Out[236]: 0

In [None]:
from pyspark.sql.functions import col

enroll.filter(col("month_year") == "").count()

Out[237]: 0

In [None]:
from pyspark.sql.functions import col

enroll.filter(col("enrollment_start_date") == "").count()

Out[238]: 0

In [None]:
from pyspark.sql.functions import col

enroll.filter(col("enrollment_end_date") == "").count()

Out[239]: 0

### Total Row Count

In [None]:
# Total row count

enroll.count()

Out[240]: 1047126

### Clean Data

- Drop nulls, no longer needed columns, and duplicates

In [None]:
# Drop extra null columns
from pyspark.sql.functions import col

enroll_upd = enroll.drop('_c2', 'month_year')

# Remove duplicates and remove rows where patient ID is null
enroll_clean = enroll_upd.dropDuplicates().filter(col("patient_id").isNotNull())

#### Double checks after data cleaing

- Final schema contains the columns we want and in the right format

In [None]:
enroll_clean.count()

Out[242]: 7309

In [None]:
enroll_clean.show()

+----------+---------------------+-------------------+
|patient_id|enrollment_start_date|enrollment_end_date|
+----------+---------------------+-------------------+
|    ID0002|           2023-09-01|         2023-09-30|
|    ID0002|           2023-03-01|         2023-03-31|
|    ID0001|           2023-01-01|         2023-01-31|
|    ID0003|           2023-06-01|         2023-06-30|
|    ID0003|           2023-04-01|         2023-04-30|
|    ID0003|           2023-03-01|         2023-03-31|
|    ID0003|           2023-12-01|         2023-12-31|
|    ID0001|           2023-02-01|         2023-02-28|
|    ID0001|           2023-06-01|         2023-06-30|
|    ID0002|           2023-10-01|         2023-10-31|
|    ID0002|           2023-02-01|         2023-02-28|
|    ID0002|           2023-12-01|         2023-12-31|
|    ID0001|           2023-11-01|         2023-11-30|
|    ID0003|           2023-07-01|         2023-07-31|
|    ID0001|           2023-12-01|         2023-12-31|
|    ID000

## Create Table

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Use lead and lag to prep for comparison
df = enroll_clean.withColumn("next_start_date", F.lead("enrollment_start_date", 1).over(Window.partitionBy("patient_id").orderBy("enrollment_start_date")))

df = df.withColumn("prev_end_date", F.lag("enrollment_end_date").over(Window.partitionBy("patient_id").orderBy("enrollment_start_date")))

# df.select("*").filter(col("patient_id") == "ID0001").show()

df = df.withColumn("shift", F.when(F.col("enrollment_start_date") > F.date_add("prev_end_date", 1), 1).otherwise(0))

# df.select("*").filter(col("patient_id") == "ID0001").show()

df = df.withColumn("group_by", F.sum("shift").over(Window.partitionBy("patient_id").orderBy("enrollment_start_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)))

# df.select("*").filter(col("patient_id") == "ID0001").show()

enrollment_span_prep = df.groupBy("patient_id", "group_by").agg(F.min("enrollment_start_date").alias("enrollment_start_date"), F.max("enrollment_end_date").alias("enrollment_end_date"))

# enrollment_span_prep.select("*").filter(col("patient_id") == "ID0001").show()

patient_enrollment_span = enrollment_span_prep.drop('group_by')

# patient_enrollment_span.select("*").filter(col("patient_id") == "ID0001").show()

## Write & Read Enrollment Span CSV

In [None]:
# s3_output_path = "s3://waymark-assignment/enrollment_span.csv"
# enroll_clean.write.option("header", True).mode("overwrite").csv(s3_output_path)

local_output_path = "/Users/saratometich/Downloads/patient_enrollment_span.csv"
patient_enrollment_span.write.option("header", True).mode("overwrite").csv(local_output_path)

In [None]:
enrollment_span_csv = spark.read.options(header='True').csv("/Users/saratometich/Downloads/patient_enrollment_span.csv")

# Answer 1

Report the number of rows in patient_enrollment_span.csv

ANSWER: 3105

In [None]:
enrollment_span_csv.count()

Out[247]: 3105

# Outpatient Visits

In [None]:
# Viewing data to make sure the date transformations are behaving the way I expect

op_visits.show()

+----------+--------+----------------------+----+----+----+----+----+----------+
|patient_id|    date|outpatient_visit_count| _c3| _c4| _c5| _c6| _c7|visit_date|
+----------+--------+----------------------+----+----+----+----+----+----------+
|    ID0001|  1/3/23|                     2|null|null|null|null|null|2023-01-03|
|    ID0001|  1/8/23|                     2|null|null|null|null|null|2023-01-08|
|    ID0001|  1/9/23|                     2|null|null|null|null|null|2023-01-09|
|    ID0001| 1/15/23|                     2|null|null|null|null|null|2023-01-15|
|    ID0001| 1/21/23|                     4|null|null|null|null|null|2023-01-21|
|    ID0001| 1/25/23|                     1|null|null|null|null|null|2023-01-25|
|    ID0001| 3/11/23|                     3|null|null|null|null|null|2023-03-11|
|    ID0001| 3/22/23|                     1|null|null|null|null|null|2023-03-22|
|    ID0001|  6/3/23|                     2|null|null|null|null|null|2023-06-03|
|    ID0001|  6/3/23|       

### Clean Data

In [None]:
# Drop extra null columns
from pyspark.sql.functions import col

op_visits_upd = op_visits.drop('_c3', '_c4', '_c5', '_c6', '_c7', 'date')

# Remove duplicates and remove rows where patient ID is null
op_visits_clean = op_visits_upd.dropDuplicates().filter(col("patient_id").isNotNull())

#### Double checks after data cleaing

- Final schema contains the columns we want and in the right format

In [None]:
op_visits.count()

Out[250]: 1043905

In [None]:
op_visits_clean.count()

Out[251]: 15196

In [None]:
op_visits_clean.show()

+----------+----------------------+----------+
|patient_id|outpatient_visit_count|visit_date|
+----------+----------------------+----------+
|    ID0001|                     1|2023-03-22|
|    ID0001|                     1|2023-09-17|
|    ID0001|                     3|2023-06-14|
|    ID0002|                     1|2023-01-15|
|    ID0001|                     2|2023-09-06|
|    ID0001|                     4|2023-07-17|
|    ID0001|                     2|2023-11-13|
|    ID0001|                     3|2023-08-31|
|    ID0001|                     2|2023-12-15|
|    ID0001|                     4|2023-01-21|
|    ID0001|                     1|2023-01-25|
|    ID0001|                     2|2023-06-03|
|    ID0001|                     2|2023-01-03|
|    ID0001|                     2|2023-01-15|
|    ID0001|                     2|2023-01-08|
|    ID0001|                     2|2023-01-09|
|    ID0001|                     4|2023-09-06|
|    ID0001|                     1|2023-06-03|
|    ID0001| 

## ct_outpatient_visits

The number of outpatient visits a patient had within the enrollment period (between enrollment_start_date and enrollment_end_date)

### Code

In [None]:
from pyspark.sql.functions import col, coalesce, lit, sum as pyspark_sum

joined_df = enrollment_span_csv.join(op_visits,
                                 (op_visits.patient_id == enrollment_span_csv.patient_id) &
                                 (op_visits.visit_date <= enrollment_span_csv.enrollment_end_date) &
                                 (op_visits.visit_date >= enrollment_span_csv.enrollment_start_date),
                                 "left")

grouped_df = joined_df.groupBy("enrollment_start_date", "enrollment_end_date", enrollment_span_csv["patient_id"].alias("patient_id"))

ct_outpatient_visits = grouped_df.agg(coalesce(pyspark_sum("outpatient_visit_count"), lit(0)).alias("ct_outpatient_visits"))

### Checks

In [None]:
# Expect no records lost from enrollment span to this point

ct_outpatient_visits.count()

Out[254]: 3105

## ct_days_with_outpatient_visit

The number of distinct days within an enrollment period (between enrollment_start_date and enrollment_end_date) when the patient had one or more outpatient visit

### Code

In [None]:
from pyspark.sql.functions import col, countDistinct, sum as pyspark_sum

joined_df = enrollment_span_csv.join(op_visits,
                                 (op_visits.patient_id == enrollment_span_csv.patient_id) &
                                 (op_visits.visit_date <= enrollment_span_csv.enrollment_end_date) &
                                 (op_visits.visit_date >= enrollment_span_csv.enrollment_start_date),
                                 "left")

grouped_df = joined_df.groupBy(enrollment_span_csv["patient_id"].alias("patient_id"), "enrollment_start_date", "enrollment_end_date")

result_df = grouped_df.agg(countDistinct("visit_date").alias("ct_days_with_outpatient_visit"),
                           pyspark_sum("outpatient_visit_count").alias("sum_outpatient_visit_count"))

result_df = result_df.filter(col("sum_outpatient_visit_count") >= 1)

ct_days_with_outpatient_visit = result_df.drop("sum_outpatient_visit_count")

In [None]:
ct_days_with_outpatient_visit.show()

+----------+---------------------+-------------------+-----------------------------+
|patient_id|enrollment_start_date|enrollment_end_date|ct_days_with_outpatient_visit|
+----------+---------------------+-------------------+-----------------------------+
|    ID0786|           2023-09-01|         2023-10-31|                            4|
|    ID0138|           2023-01-01|         2023-11-30|                           30|
|    ID0928|           2023-04-01|         2023-05-31|                            3|
|    ID0070|           2023-01-01|         2023-01-31|                            3|
|    ID0714|           2023-09-01|         2023-10-31|                            3|
|    ID0276|           2023-05-01|         2023-06-30|                            2|
|    ID0118|           2023-10-01|         2023-12-31|                            4|
|    ID0489|           2023-12-01|         2023-12-31|                            1|
|    ID0456|           2023-02-01|         2023-04-30|           

### Checks

In [None]:
# Expect either no records lost OR some records lost

ct_days_with_outpatient_visit.count()

Out[257]: 2030

# Results CSV

In [None]:
# Need to join the two outpatient results to get the final schema
from pyspark.sql.functions import col

joined_df = ct_outpatient_visits.join(ct_days_with_outpatient_visit,
                                 (ct_days_with_outpatient_visit.patient_id == ct_outpatient_visits.patient_id) &
                                 (ct_days_with_outpatient_visit.enrollment_start_date == ct_outpatient_visits.enrollment_start_date) &
                                 (ct_days_with_outpatient_visit.enrollment_end_date == ct_outpatient_visits.enrollment_end_date),
                                 "left")

result_df = joined_df.select(ct_outpatient_visits.patient_id, ct_outpatient_visits.enrollment_start_date, ct_outpatient_visits.enrollment_end_date, "ct_outpatient_visits", coalesce("ct_days_with_outpatient_visit", lit(0)).alias("ct_days_with_outpatient_visit")
).distinct()

In [None]:
result_df.show()

+----------+---------------------+-------------------+--------------------+-----------------------------+
|patient_id|enrollment_start_date|enrollment_end_date|ct_outpatient_visits|ct_days_with_outpatient_visit|
+----------+---------------------+-------------------+--------------------+-----------------------------+
|    ID0104|           2023-02-01|         2023-06-30|                81.0|                            8|
|    ID0120|           2023-01-01|         2023-01-31|                 8.0|                            3|
|    ID0207|           2023-01-01|         2023-01-31|                 0.0|                            0|
|    ID0255|           2023-06-01|         2023-08-31|                 0.0|                            0|
|    ID0302|           2023-06-01|         2023-07-31|                11.0|                            4|
|    ID0391|           2023-07-01|         2023-10-31|                 0.0|                            0|
|    ID0511|           2023-02-01|         202

In [None]:
result_df.count()

Out[260]: 3105

### Write & Read Result CSV

In [None]:
# s3_output_path = "s3://waymark-assignment/result.csv"
# result_df.write.option("header", True).mode("overwrite").csv(s3_output_path)

local_output_path = "/Users/saratometich/Downloads/result.csv"
result_df.write.option("header", True).mode("overwrite").csv(local_output_path)

In [None]:
result_csv = spark.read.options(header='True').csv("/Users/saratometich/Downloads/result.csv")

## Checks

In [None]:
result_csv.count()

Out[263]: 3105

In [None]:
count_check = result_csv.filter(col("ct_days_with_outpatient_visit") > 0).select("patient_id", "enrollment_start_date", "enrollment_end_date").distinct()

count_check.count()

Out[264]: 2030

In [None]:
count_check_2 = result_csv.filter(col("ct_days_with_outpatient_visit") == 0).select("patient_id", "enrollment_start_date", "enrollment_end_date").distinct()

count_check_2.count()

Out[265]: 1075

## Check example patient ID

In [None]:
example_check_result = result_csv.filter(col("patient_id") == 'ID0001').select("*").distinct().orderBy("enrollment_start_date")

example_check_result.show()

+----------+---------------------+-------------------+--------------------+-----------------------------+
|patient_id|enrollment_start_date|enrollment_end_date|ct_outpatient_visits|ct_days_with_outpatient_visit|
+----------+---------------------+-------------------+--------------------+-----------------------------+
|    ID0001|           2023-01-01|         2023-03-31|                17.0|                            8|
|    ID0001|           2023-06-01|         2023-06-30|                 6.0|                            2|
|    ID0001|           2023-08-01|         2023-08-31|                 6.0|                            2|
|    ID0001|           2023-11-01|         2023-12-31|                 5.0|                            3|
+----------+---------------------+-------------------+--------------------+-----------------------------+



In [None]:
example_check_span = enrollment_span_csv.filter(col("patient_id") == 'ID0001').select("*").distinct().orderBy("enrollment_start_date")

example_check_span.show()

+----------+---------------------+-------------------+
|patient_id|enrollment_start_date|enrollment_end_date|
+----------+---------------------+-------------------+
|    ID0001|           2023-01-01|         2023-03-31|
|    ID0001|           2023-06-01|         2023-06-30|
|    ID0001|           2023-08-01|         2023-08-31|
|    ID0001|           2023-11-01|         2023-12-31|
+----------+---------------------+-------------------+




# Answer 2

Report the number of distinct values of ct_days_with_outpatient_visit in
result.csv

ANSWER: 33 (including 0 as a distinct value)

In [None]:
distinct_values = result_csv.select("ct_days_with_outpatient_visit").distinct()

distinct_values.count() # includes 0

Out[268]: 33

## Checks

In [None]:
distinct_values = result_csv.select("ct_days_with_outpatient_visit").distinct().orderBy("ct_days_with_outpatient_visit")

distinct_values.show(50)

+-----------------------------+
|ct_days_with_outpatient_visit|
+-----------------------------+
|                            0|
|                            1|
|                           10|
|                           11|
|                           12|
|                           13|
|                           14|
|                           15|
|                           16|
|                           17|
|                           18|
|                           19|
|                            2|
|                           20|
|                           21|
|                           22|
|                           23|
|                           24|
|                           25|
|                           26|
|                           27|
|                           28|
|                            3|
|                           30|
|                           31|
|                           32|
|                           33|
|                            4|
|       

# QA Section

I used SQL to check my results

## Enrollment Span Content Checks

In [None]:
enroll_clean.createOrReplaceTempView("enrollment_span_prep")

In [None]:
enrollment_span_csv.createOrReplaceTempView("enrollment_span_csv")

### Code Check using SQL

In [None]:
spark.sql("""

select
patient_id
, enrollment_start_date
, enrollment_end_date
, lead(enrollment_start_date, 1) over (partition by patient_id order by enrollment_start_date asc) as next_start_date
, lag(enrollment_end_date) over (partition by patient_id order by enrollment_start_date asc) as previous_end_date

from enrollment_span_prep

where patient_id = 'ID0001'

"""
).show()

+----------+---------------------+-------------------+---------------+-----------------+
|patient_id|enrollment_start_date|enrollment_end_date|next_start_date|previous_end_date|
+----------+---------------------+-------------------+---------------+-----------------+
|    ID0001|           2023-01-01|         2023-01-31|     2023-02-01|             null|
|    ID0001|           2023-02-01|         2023-02-28|     2023-03-01|       2023-01-31|
|    ID0001|           2023-03-01|         2023-03-31|     2023-06-01|       2023-02-28|
|    ID0001|           2023-06-01|         2023-06-30|     2023-08-01|       2023-03-31|
|    ID0001|           2023-08-01|         2023-08-31|     2023-11-01|       2023-06-30|
|    ID0001|           2023-11-01|         2023-11-30|     2023-12-01|       2023-08-31|
|    ID0001|           2023-12-01|         2023-12-31|           null|       2023-11-30|
+----------+---------------------+-------------------+---------------+-----------------+



In [None]:
spark.sql("""

with cte1 as (

    select
    patient_id
    , enrollment_start_date
    , enrollment_end_date
    , lead(enrollment_start_date, 1) over (partition by patient_id order by enrollment_start_date asc) as next_start_date
    , lag(enrollment_end_date) over (partition by patient_id order by enrollment_start_date asc) as previous_end_date

    from enrollment_span_prep

),

cte2 as (
    select
    patient_id
    , enrollment_start_date
    , enrollment_end_date
    , next_start_date
    , previous_end_date
    , date_add(previous_end_date, 1) as test

    from cte1

)

select * from cte2 where patient_id = 'ID0001'

"""
).show()

+----------+---------------------+-------------------+---------------+-----------------+----------+
|patient_id|enrollment_start_date|enrollment_end_date|next_start_date|previous_end_date|      test|
+----------+---------------------+-------------------+---------------+-----------------+----------+
|    ID0001|           2023-01-01|         2023-01-31|     2023-02-01|             null|      null|
|    ID0001|           2023-02-01|         2023-02-28|     2023-03-01|       2023-01-31|2023-02-01|
|    ID0001|           2023-03-01|         2023-03-31|     2023-06-01|       2023-02-28|2023-03-01|
|    ID0001|           2023-06-01|         2023-06-30|     2023-08-01|       2023-03-31|2023-04-01|
|    ID0001|           2023-08-01|         2023-08-31|     2023-11-01|       2023-06-30|2023-07-01|
|    ID0001|           2023-11-01|         2023-11-30|     2023-12-01|       2023-08-31|2023-09-01|
|    ID0001|           2023-12-01|         2023-12-31|           null|       2023-11-30|2023-12-01|


In [None]:
spark.sql("""

with cte1 as (
    select
    patient_id
    , enrollment_start_date
    , enrollment_end_date
    , lead(enrollment_start_date, 1) over (partition by patient_id order by enrollment_start_date asc) as next_start_date
    , lag(enrollment_end_date, 1) over (partition by patient_id order by enrollment_start_date asc) as previous_end_date

    from enrollment_span_prep

    where patient_id = 'ID0001'

),

cte2 as (

    select
    patient_id
    , enrollment_start_date
    , enrollment_end_date
    , next_start_date
    , previous_end_date
    , case when enrollment_start_date > date_add(previous_end_date, 1) then 1 else 0 end as shift

    from cte1

)


select
patient_id
, enrollment_start_date
, enrollment_end_date
, next_start_date
, previous_end_date
, sum(shift) over (partition by patient_id rows unbounded preceding) as group_by

from cte2

"""
).show()

+----------+---------------------+-------------------+---------------+-----------------+--------+
|patient_id|enrollment_start_date|enrollment_end_date|next_start_date|previous_end_date|group_by|
+----------+---------------------+-------------------+---------------+-----------------+--------+
|    ID0001|           2023-01-01|         2023-01-31|     2023-02-01|             null|       0|
|    ID0001|           2023-02-01|         2023-02-28|     2023-03-01|       2023-01-31|       0|
|    ID0001|           2023-03-01|         2023-03-31|     2023-06-01|       2023-02-28|       0|
|    ID0001|           2023-06-01|         2023-06-30|     2023-08-01|       2023-03-31|       1|
|    ID0001|           2023-08-01|         2023-08-31|     2023-11-01|       2023-06-30|       2|
|    ID0001|           2023-11-01|         2023-11-30|     2023-12-01|       2023-08-31|       3|
|    ID0001|           2023-12-01|         2023-12-31|           null|       2023-11-30|       3|
+----------+--------

In [None]:
spark.sql("""

with cte1 as (

    select
    patient_id
    , enrollment_start_date
    , enrollment_end_date
    , lead(enrollment_start_date, 1) over (partition by patient_id order by enrollment_start_date asc) as next_start_date
    , lag(enrollment_end_date) over (partition by patient_id order by enrollment_start_date asc) as previous_end_date

    from enrollment_span_prep

),

cte2 as (

    select
    patient_id
    , enrollment_start_date
    , enrollment_end_date
    , next_start_date
    , previous_end_date
    , case when enrollment_start_date > date_add(previous_end_date, 1) then 1 else 0 end as shift

    from cte1
),

cte3 as (

    select
    patient_id
    , enrollment_start_date
    , enrollment_end_date
    , sum(shift) over (partition by patient_id order by enrollment_start_date rows unbounded preceding) as group_by

    from cte2

)

select
patient_id,
min(enrollment_start_date) as enrollment_start_date,
max(enrollment_end_date) as enrollment_end_date

from cte3

where patient_id = 'ID0001'

group by
patient_id
, group_by

order by
patient_id
, group_by

"""
).show()

+----------+---------------------+-------------------+
|patient_id|enrollment_start_date|enrollment_end_date|
+----------+---------------------+-------------------+
|    ID0001|           2023-01-01|         2023-03-31|
|    ID0001|           2023-06-01|         2023-06-30|
|    ID0001|           2023-08-01|         2023-08-31|
|    ID0001|           2023-11-01|         2023-12-31|
+----------+---------------------+-------------------+



### Enrollment Date Reasonability Checks

In [None]:
spark.sql("""

    select distinct
    patient_id
    , enrollment_start_date
    , enrollment_end_date

    from enrollment_span_csv

    where enrollment_start_date < enrollment_end_date

"""
).count()

Out[276]: 3105

In [None]:
spark.sql("""

    select distinct
    patient_id
    , enrollment_start_date
    , enrollment_end_date

    from enrollment_span_csv

    where enrollment_start_date = enrollment_end_date

"""
).count()

Out[277]: 0

In [None]:
spark.sql("""

    select distinct
    patient_id
    , enrollment_start_date
    , enrollment_end_date

    from enrollment_span_csv

    where enrollment_start_date > enrollment_end_date

"""
).count()

Out[278]: 0

### Duplicates

- Make sure no duplicate rows exist

In [None]:
spark.sql("""

    select distinct
    patient_id
    , enrollment_start_date
    , enrollment_end_date

    from enrollment_span_csv

    group by
    patient_id
    , enrollment_start_date
    , enrollment_end_date

    having count(*) > 1

"""
).count()

Out[279]: 0

### Min/Max Enrollment Dates

- Check max/min enrollment dates

In [None]:
spark.sql("""

    select distinct
    min(enrollment_start_date)
    , max(enrollment_start_date)
    , min(enrollment_end_date)
    , max(enrollment_end_date)

    from enrollment_span_csv

"""
).show()

+--------------------------+--------------------------+------------------------+------------------------+
|min(enrollment_start_date)|max(enrollment_start_date)|min(enrollment_end_date)|max(enrollment_end_date)|
+--------------------------+--------------------------+------------------------+------------------------+
|                2023-01-01|                2023-12-01|              2023-01-31|              2023-12-31|
+--------------------------+--------------------------+------------------------+------------------------+



## Outpatient Visits Content Checks

In [None]:
op_visits_clean.createOrReplaceTempView("op_visits_csv")

In [None]:
ct_outpatient_visits.createOrReplaceTempView("outpatient_visits")

#### Get an example patient ID

In [None]:
spark.sql("""

    select *

    from enrollment_span_csv

    where patient_id = 'ID0001'

"""
).show()

+----------+---------------------+-------------------+
|patient_id|enrollment_start_date|enrollment_end_date|
+----------+---------------------+-------------------+
|    ID0001|           2023-01-01|         2023-03-31|
|    ID0001|           2023-06-01|         2023-06-30|
|    ID0001|           2023-08-01|         2023-08-31|
|    ID0001|           2023-11-01|         2023-12-31|
+----------+---------------------+-------------------+



In [None]:
spark.sql("""

    select
    *
    , sum(outpatient_visit_count) over (partition by patient_id, date_trunc('MONTH', visit_date)) as ct_outpatient_visits

    from op_visits_csv

    where patient_id = 'ID0001'

"""
).show()

+----------+----------------------+----------+--------------------+
|patient_id|outpatient_visit_count|visit_date|ct_outpatient_visits|
+----------+----------------------+----------+--------------------+
|    ID0001|                     4|2023-01-21|                13.0|
|    ID0001|                     1|2023-01-25|                13.0|
|    ID0001|                     2|2023-01-03|                13.0|
|    ID0001|                     2|2023-01-15|                13.0|
|    ID0001|                     2|2023-01-08|                13.0|
|    ID0001|                     2|2023-01-09|                13.0|
|    ID0001|                     1|2023-03-22|                 4.0|
|    ID0001|                     3|2023-03-11|                 4.0|
|    ID0001|                     3|2023-06-14|                 6.0|
|    ID0001|                     2|2023-06-03|                 6.0|
|    ID0001|                     1|2023-06-03|                 6.0|
|    ID0001|                     4|2023-07-17|  

#### Check ct_outpatient_visits

- Compare SQL output to python output

In [None]:
spark.sql("""

    select distinct
    enr.patient_id
    , enrollment_start_date
    , enrollment_end_date
    , coalesce(sum(outpatient_visit_count), 0) as ct_outpatient_visits

    from enrollment_span_csv enr

    left join op_visits_csv op
        on op.patient_id = enr.patient_id
        and op.visit_date <= enr.enrollment_end_date
        and op.visit_date >= enr.enrollment_start_date

    group by
    enr.patient_id
    , enrollment_start_date
    , enrollment_end_date

"""
).show()

+----------+---------------------+-------------------+--------------------+
|patient_id|enrollment_start_date|enrollment_end_date|ct_outpatient_visits|
+----------+---------------------+-------------------+--------------------+
|    ID0015|           2023-02-01|         2023-02-28|                 0.0|
|    ID0070|           2023-01-01|         2023-01-31|                 6.0|
|    ID0118|           2023-10-01|         2023-12-31|                26.0|
|    ID0138|           2023-01-01|         2023-11-30|               282.0|
|    ID0222|           2023-08-01|         2023-12-31|                92.0|
|    ID0276|           2023-05-01|         2023-06-30|                 6.0|
|    ID0411|           2023-02-01|         2023-07-31|                 0.0|
|    ID0419|           2023-09-01|         2023-12-31|                 0.0|
|    ID0437|           2023-07-01|         2023-07-31|                 5.0|
|    ID0456|           2023-02-01|         2023-04-30|                10.0|
|    ID0477|

In [None]:
spark.sql("""

    select *

    from outpatient_visits

    where patient_id = 'ID0015'

"""
).show()

+---------------------+-------------------+----------+--------------------+
|enrollment_start_date|enrollment_end_date|patient_id|ct_outpatient_visits|
+---------------------+-------------------+----------+--------------------+
|           2023-09-01|         2023-10-31|    ID0015|                 0.0|
|           2023-06-01|         2023-06-30|    ID0015|                 0.0|
|           2023-02-01|         2023-02-28|    ID0015|                 0.0|
|           2023-04-01|         2023-04-30|    ID0015|                 0.0|
+---------------------+-------------------+----------+--------------------+



In [None]:
spark.sql("""

    select *

    from op_visits_csv

    where patient_id = 'ID0015'

"""
).show()

+----------+----------------------+----------+
|patient_id|outpatient_visit_count|visit_date|
+----------+----------------------+----------+
+----------+----------------------+----------+



## Days with Outpatient Visits Content Checks

In [None]:
ct_days_with_outpatient_visit.createOrReplaceTempView("outpatient_days")

### Check ct_days_with_outpatient_visit

- Check output using SQL

In [None]:
spark.sql("""

    select distinct
    enr.patient_id
    , enrollment_start_date
    , enrollment_end_date
    , count(distinct visit_date) as ct_days_with_outpatient_visit

    from enrollment_span_csv enr

    left join op_visits_csv op
        on op.patient_id = enr.patient_id
        and op.visit_date <= enr.enrollment_end_date
        and op.visit_date >= enr.enrollment_start_date

    group by
    enr.patient_id
    , enrollment_start_date
    , enrollment_end_date

    having sum(outpatient_visit_count) >= 1

"""
).show()

+----------+---------------------+-------------------+-----------------------------+
|patient_id|enrollment_start_date|enrollment_end_date|ct_days_with_outpatient_visit|
+----------+---------------------+-------------------+-----------------------------+
|    ID0786|           2023-09-01|         2023-10-31|                            4|
|    ID0138|           2023-01-01|         2023-11-30|                           30|
|    ID0928|           2023-04-01|         2023-05-31|                            3|
|    ID0070|           2023-01-01|         2023-01-31|                            3|
|    ID0714|           2023-09-01|         2023-10-31|                            3|
|    ID0276|           2023-05-01|         2023-06-30|                            2|
|    ID0118|           2023-10-01|         2023-12-31|                            4|
|    ID0489|           2023-12-01|         2023-12-31|                            1|
|    ID0456|           2023-02-01|         2023-04-30|           

In [None]:
spark.sql("""

    select *

    from outpatient_days

    where patient_id = 'ID0852'

"""
).show()

+----------+---------------------+-------------------+-----------------------------+
|patient_id|enrollment_start_date|enrollment_end_date|ct_days_with_outpatient_visit|
+----------+---------------------+-------------------+-----------------------------+
|    ID0852|           2023-07-01|         2023-12-31|                           10|
|    ID0852|           2023-02-01|         2023-03-31|                            3|
+----------+---------------------+-------------------+-----------------------------+



In [None]:
spark.sql("""

    select count(*)

    from op_visits_csv

    where patient_id = 'ID0852'
    and date_trunc('MONTH', visit_date) >= '2023-07-01'
    and date_trunc('MONTH', visit_date) <= '2023-12-31'

"""
).show()

+--------+
|count(1)|
+--------+
|      10|
+--------+



### Verify patient IDs that didn't meet the criteria

- The number of distinct days within an enrollment period (between enrollment_start_date and enrollment_end_date) when the patient had one or more outpatient visit

In [None]:
spark.sql("""

select distinct patient_id from outpatient_visits
except
select distinct patient_id from outpatient_days

"""
).show()

+----------+
|patient_id|
+----------+
|    ID0639|
|    ID0795|
|    ID0981|
|    ID0207|
|    ID0627|
|    ID0815|
|    ID0783|
|    ID0507|
|    ID0963|
|    ID0107|
|    ID0375|
|    ID0079|
|    ID0427|
|    ID0979|
|    ID0911|
|    ID0691|
|    ID0879|
|    ID0607|
|    ID0759|
|    ID0203|
+----------+
only showing top 20 rows



In [None]:
spark.sql("""

    select sum(ct_outpatient_visits)

    from outpatient_visits

    where patient_id in (

        select distinct patient_id from outpatient_visits
        except
        select distinct patient_id from outpatient_days

    )

"""
).show()

+-------------------------+
|sum(ct_outpatient_visits)|
+-------------------------+
|                      0.0|
+-------------------------+



In [None]:
spark.sql("""

    select *

    from op_visits_csv

    where patient_id in (

        select distinct patient_id from outpatient_visits
        except
        select distinct patient_id from outpatient_days

    )

"""
).show()

+----------+----------------------+----------+
|patient_id|outpatient_visit_count|visit_date|
+----------+----------------------+----------+
+----------+----------------------+----------+



In [None]:
spark.sql("""

    select *

    from outpatient_visits

    where patient_id = 'ID0647'

"""
).show()

+---------------------+-------------------+----------+--------------------+
|enrollment_start_date|enrollment_end_date|patient_id|ct_outpatient_visits|
+---------------------+-------------------+----------+--------------------+
|           2023-06-01|         2023-06-30|    ID0647|                 0.0|
|           2023-01-01|         2023-03-31|    ID0647|                 0.0|
|           2023-11-01|         2023-11-30|    ID0647|                 0.0|
|           2023-09-01|         2023-09-30|    ID0647|                 0.0|
+---------------------+-------------------+----------+--------------------+



In [None]:
spark.sql("""

    select *

    from op_visits_csv

    where patient_id = 'ID0647'

"""
).show()

+----------+----------------------+----------+
|patient_id|outpatient_visit_count|visit_date|
+----------+----------------------+----------+
+----------+----------------------+----------+

