# Scout Spark Delta Lake Quickstart

In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
# Start spark session
spark = SparkSession.builder.appName("hl7-searches").getOrCreate()

In [3]:
# Load and create sql view of report data
spark.read.format("delta") \
    .load("s3a://lake/delta_tables/reports") \
    .createOrReplaceTempView("reports")

In [4]:
# Count how many reports are a
count_reports = spark.sql("""
    SELECT COUNT(*) FROM reports
""")

In [5]:
count_reports.show()

+--------+
|count(1)|
+--------+
|16115804|
+--------+



In [6]:
# Show available modalities and count
modality_count = spark.sql("""
    SELECT modality, COUNT(*) as count
    FROM reports
    GROUP BY modality
    ORDER BY count DESC
""")

In [None]:
modality_count.show()

In [None]:
# Print out the schema and show an example report
show_schema = spark.sql("""
    SELECT * FROM reports LIMIT 10
""")

In [None]:
show_schema.printSchema()
show_schema.show(1, vertical=True)

In [None]:
# View Sex Distribution in CT and MR Reports
sex_distribution_query = spark.sql("""
    SELECT sex, modality, COUNT(*) as count
    FROM reports
    WHERE modality IN ('CT', 'MR')
    GROUP BY sex, modality
    ORDER BY modality, sex, count DESC
""")

In [None]:
sex_distribution_query.show()

In [None]:
# Import a list of study instance uids from a csv to search for
csv = spark.read.csv(
    '/home/jovyan/Scout/study_instance_uids.csv', 
    inferSchema=True, 
    header = True
)

csv.createOrReplaceTempView("siuids")

In [None]:
# Search for study instance uids from csv
search = spark.sql("""
    SELECT * 
    FROM reports 
    INNER JOIN siuids 
    ON reports.study_instance_uid=siuids.study_instance_uid
""")

In [None]:
search.count()

In [None]:
# Write joined data to csv
search.select(
        'birth_date',
        'sex',
        'race',
        'reports.study_instance_uid',
        'report_text',
        'abc_mr',
        'epic_mrn',
    ).write.csv('/home/jovyan/Scout/search_siuids_csv', header=True)

In [None]:
# Select 50k random CTs
search = spark.sql("""
    SELECT * 
    FROM reports 
    WHERE modality='CT'
    ORDER BY RAND() 
    LIMIT 50000
""")

In [None]:
search.count()

In [None]:
# Write to csv
search.select(
        'birth_date',
        'sex',
        'race',
        'reports.study_instance_uid',
        'report_text',
        'abc_mr',
        'epic_mrn',
        'modality'
    ).write.csv('/home/jovyan/Scout/50k_ct_csv', header=True)

In [None]:
# Select 300k random MGs
search = spark.sql("""
    SELECT * 
    FROM reports 
    WHERE modality='MG'
    ORDER BY RAND() 
    LIMIT 300000
""")

In [None]:
search.count()

In [None]:
# Write to csv
search.select(
        'birth_date',
        'sex',
        'race',
        'reports.study_instance_uid',
        'report_text',
        'abc_mr',
        'epic_mrn',
        'modality'
    ).write.csv('/home/jovyan/Scout/300k_mg_csv', header=True)