[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/tomlincr/simSDE/blob/main/HES_APC_playground.ipynb)

# Notes
1. Press 'Open In Colab' to run the notebook in Google Colab.
2. DO NOT run the first two cells again, if you need to go to the menu `Runtime/Disconnect and delete runtime` and start again!

# Prepare PySpark environment (~2mins) - ONLY RUN ONCE!
Based on this useful notebook, via Google search: https://colab.research.google.com/drive/1fa2G3YuXx3Isqyby5kFETqmWotFwtqlH#scrollTo=hxv7w_2y2bb9

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# TODO: check Spark version matches SDE
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import findspark
import pyspark
from pyspark.sql import SparkSession

findspark.init()
findspark.find()

spark= SparkSession \
       .builder \
       .appName("SDE Simulator") \
       .getOrCreate()

spark

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [1[0m[33m0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [C[0m                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:6 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease [24.3 kB]


# Load small extract of synthetic HES APC data (~2 mins) - ONLY RUN ONCE!
1. Dataset is 'Artificial HES Admitted Patient Care sample' from: https://digital.nhs.uk/services/artificial-data
  * We just use the last 3 financial years data for speed (19-20, 20-21, 21-22)
  * We will load these into a spark table called `hes_apc`
2. Data dictionary is available here: https://digital.nhs.uk/data-and-information/data-tools-and-services/data-services/hospital-episode-statistics/hospital-episode-statistics-data-dictionary


In [2]:
import requests
import zipfile
import io
import pandas as pd

# TODO: try to open locally, if not found download
url = 'https://files.digital.nhs.uk/assets/Services/Artificial%20data/Artificial%20HES%20final/artificial_hes_apc_202302_v1_sample.zip'
r = requests.get(url)

z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall()

dfs = []
# NOTE: limit to last 3 financial years for speed
for f in z.filelist[-5:-2]:
    if f.filename.endswith('.csv'):
        print(f'Loading: {f.filename}')
        dfs.append(pd.read_csv(z.open(f.filename)))
df = pd.concat(dfs)

print(f"Loaded: {df.shape[0]} rows")

print("Loading into Spark table hes_apc")
# For 30k rows (3 FYs) takes ~ 1 min
df = spark.createDataFrame(df)
df.createOrReplaceTempView("hes_apc")
# df.write.mode("overwrite").saveAsTable("hes_apc")

print("Complete! Spark table hes_apc completed")

del(df)

Loading: artificial_hes_apc_202302_v1_sample/artificial_hes_apc_1920.csv


  dfs.append(pd.read_csv(z.open(f.filename)))


Loading: artificial_hes_apc_202302_v1_sample/artificial_hes_apc_2021.csv


  dfs.append(pd.read_csv(z.open(f.filename)))


Loading: artificial_hes_apc_202302_v1_sample/artificial_hes_apc_2122.csv


  dfs.append(pd.read_csv(z.open(f.filename)))


Loaded: 30000 rows
Loading into Spark table hes_apc
Complete! Spark table hes_apc completed


# Example queries
We can run ~standard SQL (actually Spark SQL, but very similar) by wrapping our query as follows:   
```py
spark.sql("""
<SQL QUERY GOES HERE>
""").show()
```

The alternative is to use PySpark (a python API for Spark).  

See below for some examples of both

In [3]:
spark.sql("""
SELECT * FROM hes_apc LIMIT 5
""").show()

+-----+--------+------------+------------+--------------------+----------+--------+--------+----------+--------+---------+-------+--------------+------------+------------+------+------+-------+-------+---------------+-------------+------------------+-------------------------+-------------+--------------------+--------+--------------+------------+------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+-------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------------+----------+----------+-------+-------+----------+-------+------------+------+----------+--------+----------+-------+-------+------+---+-------------+---+---+--------+------+--------+-----+------+---

In [4]:
spark.sql("""
SELECT
  COUNT(*) as n,
  COUNT(DISTINCT PSEUDO_HESID) as ids
FROM
  hes_apc
""").show()

+-----+-----+
|    n|  ids|
+-----+-----+
|30000|13934|
+-----+-----+



In [8]:
# import all 'SQL' like functions, e.g. count
import pyspark.sql.functions as F

# Pull the table into a pyspark dataframe
df = spark.table('hes_apc')

df \
.select(
    F.count("*").alias("n"),
    F.countDistinct("PSEUDO_HESID").alias("ids")
    ) \
.show()

+-----+-----+
|    n|  ids|
+-----+-----+
|30000|13934|
+-----+-----+



# COVID example

In [9]:
spark.sql("""
SELECT
  SUM(CASE WHEN DIAG_4_01 = 'U071' THEN 1 ELSE 0 END) as covid_confirmed_primary,
  SUM(CASE WHEN DIAG_4_01 = 'U072' THEN 1 ELSE 0 END) as covid_suspected_primary,
  SUM(CASE WHEN DIAG_4_CONCAT LIKE '%U071%' OR DIAG_4_CONCAT LIKE '%U072%' THEN 1 ELSE 0 END) as covid_any
FROM
  hes_apc
""").show()

+-----------------------+-----------------------+---------+
|covid_confirmed_primary|covid_suspected_primary|covid_any|
+-----------------------+-----------------------+---------+
|                    343|                     26|      525|
+-----------------------+-----------------------+---------+



In [11]:
# import all 'SQL' like functions, e.g. count
import pyspark.sql.functions as F

# Pull the table into a pyspark dataframe
df = spark.table('hes_apc')

df \
.select(
    F.sum(F.when(F.col("DIAG_4_01") == "U071", 1).otherwise(0)).alias("covid_confirmed_primary"),
    F.sum(F.when(F.col("DIAG_4_01") == "U072", 1).otherwise(0)).alias("covid_suspected_primary"),
    F.sum(F.when(F.col("DIAG_4_CONCAT").like("%U071%") | F.col("DIAG_4_CONCAT").like("%U072%"), 1).otherwise(0)).alias("covid_any")
) \
.show()

+-----------------------+-----------------------+---------+
|covid_confirmed_primary|covid_suspected_primary|covid_any|
+-----------------------+-----------------------+---------+
|                    343|                     26|      525|
+-----------------------+-----------------------+---------+



In [14]:
spark.sql("""
SELECT
  SEX,
  MEAN(STARTAGE)
FROM
  hes_apc
WHERE
  DIAG_4_01 = 'U071'
AND
  STARTAGE < 120
AND
  (SEX = 1 OR SEX = 2)
GROUP BY
  SEX
""").show()

+---+-----------------+
|SEX|   mean(STARTAGE)|
+---+-----------------+
|  1|50.46308724832215|
|  2| 55.1948051948052|
+---+-----------------+



In [12]:
# Assume you've already created the data frame in cell above
df \
.filter(
    (df.DIAG_4_01 == "U071")
    & (df.STARTAGE < 120)
    & ((df.SEX == 1) | (df.SEX == 2))
) \
.groupBy("SEX") \
.agg(
    F.avg("STARTAGE").alias("MEAN_STARTAGE")
    ) \
.show()

+---+-----------------+
|SEX|    MEAN_STARTAGE|
+---+-----------------+
|  1|50.46308724832215|
|  2| 55.1948051948052|
+---+-----------------+



So, on average, men hospitalised with COVID were younger

# Pull result into pandas df using `.toPandas()` method
We can then use this for downstream analyses or visualisation (in Python, can do similar in R in SDE but not on colab)

In [15]:
df = spark.sql("""
SELECT
  SEX,
  MEAN(STARTAGE)
FROM
  hes_apc
WHERE
  DIAG_4_01 = 'U071'
AND
  STARTAGE < 120
AND
  (SEX = 1 OR SEX = 2)
GROUP BY
  SEX
  """).toPandas()

df.head()

Unnamed: 0,SEX,mean(STARTAGE)
0,1,50.463087
1,2,55.194805


