In [6]:
import requests
import pandas as pd
import time
import os
import subprocess

In [3]:
# SPARK
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

credentials_location = '/home/kelvin/.gc/gc-key.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "/home/kelvin/spark/spark-3.3.1-bin-hadoop3/lib/gcs-connector-hadoop3-2.2.5.jar,/home/kelvin/spark/spark-3.3.1-bin-hadoop3/lib/spark-3.1-bigquery-0.28.0-preview.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location) \
    .set("spark.driver.extraClassPath", "/home/kelvin/spark/spark-3.3.1-bin-hadoop3/lib/spark-bigquery-with-dependencies_2.12-0.28.0.jar")

sc = SparkContext.getOrCreate(conf=conf)
print('scWebURL:', sc.uiWebUrl)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

spark.conf.set("temporaryGcsBucket","de-project-bucket")

23/02/20 02:33:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


scWebURL: http://de-proj.australia-southeast2-a.c.de-zoomcamp-project-377704.internal:4040


In [6]:
spark.stop()

In [2]:
def create_tables(BASE_URL: str, headers: dict, measure_category_code: str, measure_code: str):
    # CREATE VALUES TABLE
    value_response = requests.get(f'{BASE_URL}/measures/{measure_code}/data-items', headers=headers).json()
    value_list = []
    for result in value_response['result']:
        value_list.append([result['reported_measure_code'], result['reporting_unit_summary']['reporting_unit_code'], result['value'],result['data_set_id']])
    values_df = pd.DataFrame(data=value_list, columns=['reported_measure_code', 'reporting_unit_code', 'value', 'data_set_id'])
    values_df.name = 'values'
    
    # CREATE DATASETS TABLE
    dataset_response = requests.get(f'{BASE_URL}/datasets', headers=headers).json()
    dataset_list = []
    for result in dataset_response['result']:
        dataset_list.append([result['data_set_id'], result['data_set_name'], result['reporting_start_date'], result['reporting_end_date']])
    datasets_df = pd.DataFrame(data=dataset_list, columns=['data_set_id', 'data_set_name', 'reporting_start_date', 'reporting_end_date'])
    datasets_df.name = 'datasets'
    
    # CREATE REPORTED MEASURES TABLE
    reported_measure_code_list = list(values_df['reported_measure_code'].unique())
    reported_measure_list = []
    for reported_measure_code in reported_measure_code_list:
        reported_measure_response = requests.get(f'{BASE_URL}/reported-measures/{reported_measure_code}', headers=headers).json()
        reported_measure_list.append([reported_measure_code, reported_measure_response['result']['reported_measure_name']])
    reported_measures_df = pd.DataFrame(data=reported_measure_list, columns=['reported_measure_code', 'reported_measure_name'])
    reported_measures_df.name = 'reported_measures'
    # CREATE REPORTING UNITS TABLE
    reporting_unit_response = requests.get(f'{BASE_URL}/reporting-units', headers=headers).json()   

    reporting_unit_list = []
    for result in reporting_unit_response['result']:
        # GET STATE
        mapped_reporting_units = result['mapped_reporting_units']
        state = None
        for mapped_reporting_unit in mapped_reporting_units:
            if mapped_reporting_unit['map_type']['mapped_reporting_unit_code'] == "STATE_MAPPING":
                state = mapped_reporting_unit['mapped_reporting_unit']['reporting_unit_code']
                break # Set state as the first reporting_unit_code in mapped_reporting_units

        # GET REPORTING UNIT INFO
        reporting_unit_list.append([result['reporting_unit_code'], result['reporting_unit_name'], result['reporting_unit_type']['reporting_unit_type_code'], result['reporting_unit_type']['reporting_unit_type_name'], state, result['closed'], result['private'], result['latitude'], result['longitude']])

    reporting_units_df = pd.DataFrame(data=reporting_unit_list, columns=['reporting_unit_code', 'reporting_unit_name', 'reporting_unit_type_code', 'reporting_unit_type_name', 'state', 'closed', 'private', 'latitude', 'longitude'])
    reporting_units_df.name = 'reporting_units'
    
    # CREATE LIST OF DATAFRAMES
    df_list = [values_df, datasets_df, reported_measures_df, reporting_units_df]
    return df_list

In [8]:
'''
CREATE MEASURE TABLE
EXAMPLE URL: https://myhospitalsapi.aihw.gov.au//api/v1/measure-categories/MYH-ED-TIME/measures
'''
measure_response = requests.get(f'{BASE_URL}/measure-categories/{measure_category_code}/measures', headers=headers).json()
measure_list = []
for i in measure_response['result']:
    measure_list.append([i['measure_code'], i['measure_name']])
measures_df = pd.DataFrame(data=measure_list, columns=['measure_code', 'measure_name'])
measures_df.head()

Unnamed: 0,measure_code,measure_name
0,MYH0005,Percentage of patients who depart the emergenc...
1,MYH0012,Number of patients presenting to the emergency...
2,MYH0013,Time until most patients (90%) departed the em...
3,MYH0036,Median time (50%) patients departed emergency ...


In [4]:
# DECLARE VARIABLES
LOCAL_BASE_PATH = 'data'
measure_category_code = 'MYH-ED-TIME'
measure_name = 'MYH0036'
measure_path = f'{LOCAL_BASE_PATH}/{measure_name}'
BASE_URL ='https://myhospitalsapi.aihw.gov.au//api/v1'
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'}
bucket_name = "de-project-bucket"

In [79]:
# LOAD PANDAS DFs TO LOCAL DISK AS PARQUET
df_list = create_tables(BASE_URL, headers, measure_category_code, measure_name)
if not os.path.exists(measure_path):
    os.makedirs(measure_path)
for df in df_list:
    file_path = os.path.join(f'{LOCAL_BASE_PATH}/{measure_name}', f'{df.name}.parquet')
    df.to_parquet(file_path)
    print(f'{df.name}.parquet', 'created.')   

values.parquet created.
datasets.parquet created.
reported_measures.parquet created.
reporting_units.parquet created.


In [98]:
# LOAD PARQUET FILES TO GCS
subprocess.run(['gsutil', '-m', 'cp', '-r', LOCAL_BASE_PATH, f'gs://{bucket_name}/'])

Copying file://data/MYH0036/reported_measures.parquet [Content-Type=application/octet-stream]...
/ [0/4 files][    0.0 B/140.8 KiB]   0% Done                                    Copying file://data/MYH0036/datasets.parquet [Content-Type=application/octet-stream]...
/ [0/4 files][    0.0 B/140.8 KiB]   0% Done                                    Copying file://data/MYH0036/reporting_units.parquet [Content-Type=application/octet-stream]...
/ [0/4 files][    0.0 B/140.8 KiB]   0% Done                                    Copying file://data/MYH0036/values.parquet [Content-Type=application/octet-stream]...
/ [0/4 files][    0.0 B/140.8 KiB]   0% Done                                    / [1/4 files][140.8 KiB/140.8 KiB]  99% Done                                    / [2/4 files][140.8 KiB/140.8 KiB]  99% Done                                    / [3/4 files][140.8 KiB/140.8 KiB]  99% Done                                    / [4/4 files][140.8 KiB/140.8 KiB] 100% Done                       

CompletedProcess(args=['gsutil', '-m', 'cp', '-r', 'data', 'gs://de-project-bucket/'], returncode=0)

In [7]:
# CREATE SPARK DFs FROM GCS FILES AND JOIN
command = ['gsutil', 'ls', f'gs://{bucket_name}/{measure_path}/']
output = subprocess.check_output(command).decode().split('\n')
for line in output:
    if line.strip():
        print(line.strip())
df_values = spark.read.parquet(f'gs://{bucket_name}/{measure_path}/values.parquet')
df_reported_measures = spark.read.parquet(f'gs://{bucket_name}/{measure_path}/reported_measures.parquet')
df_reporting_units = spark.read.parquet(f'gs://{bucket_name}/{measure_path}/reporting_units.parquet')
df_datasets = spark.read.parquet(f'gs://{bucket_name}/{measure_path}/datasets.parquet')
df_join = df_values.join(df_reported_measures, on=['reported_measure_code'], how='inner')
df_join = df_join.join(df_reporting_units, on=['reporting_unit_code'], how='inner')
df_join = df_join.join(df_datasets, on=['data_set_id'], how='inner')

gs://de-project-bucket/data/MYH0036/datasets.parquet
gs://de-project-bucket/data/MYH0036/reported_measures.parquet
gs://de-project-bucket/data/MYH0036/reporting_units.parquet
gs://de-project-bucket/data/MYH0036/values.parquet


                                                                                

In [8]:
from pyspark.sql.functions import floor, col, expr, from_unixtime, concat, concat_ws, lit, hour, minute, year
df_join = df_join.withColumnRenamed('value', 'median_wait_time_minutes')
df_join = df_join.withColumn('year', concat_ws('-', year('reporting_start_date'), year('reporting_end_date')))

In [9]:
df_join.limit(10).toPandas()

                                                                                

Unnamed: 0,data_set_id,reporting_unit_code,reported_measure_code,median_wait_time_minutes,reported_measure_name,reporting_unit_name,reporting_unit_type_code,reporting_unit_type_name,state,closed,private,latitude,longitude,data_set_name,reporting_start_date,reporting_end_date,year
0,2092,H0014,MYH-RM0298,355.0,Subsequently admitted patients,The Children's Hospital at Westmead,H,Hospital,NSW,False,False,-33.801554,150.991759,ED 2011-12,2011-07-01,2012-06-30,2011-2012
1,2093,H0014,MYH-RM0299,170.0,Not subsequently admitted patients,The Children's Hospital at Westmead,H,Hospital,NSW,False,False,-33.801554,150.991759,ED 2011-12,2011-07-01,2012-06-30,2011-2012
2,2094,H0014,MYH-RM0300,204.0,All patients,The Children's Hospital at Westmead,H,Hospital,NSW,False,False,-33.801554,150.991759,ED 2011-12,2011-07-01,2012-06-30,2011-2012
3,2095,H0014,MYH-RM0298,338.0,Subsequently admitted patients,The Children's Hospital at Westmead,H,Hospital,NSW,False,False,-33.801554,150.991759,ED 2012-13,2012-07-01,2013-06-30,2012-2013
4,2096,H0014,MYH-RM0299,173.0,Not subsequently admitted patients,The Children's Hospital at Westmead,H,Hospital,NSW,False,False,-33.801554,150.991759,ED 2012-13,2012-07-01,2013-06-30,2012-2013
5,2097,H0014,MYH-RM0300,207.0,All patients,The Children's Hospital at Westmead,H,Hospital,NSW,False,False,-33.801554,150.991759,ED 2012-13,2012-07-01,2013-06-30,2012-2013
6,2128,H0014,MYH-RM0298,314.0,Subsequently admitted patients,The Children's Hospital at Westmead,H,Hospital,NSW,False,False,-33.801554,150.991759,ED 2013-14,2013-07-01,2014-06-30,2013-2014
7,2129,H0014,MYH-RM0299,164.0,Not subsequently admitted patients,The Children's Hospital at Westmead,H,Hospital,NSW,False,False,-33.801554,150.991759,ED 2013-14,2013-07-01,2014-06-30,2013-2014
8,2130,H0014,MYH-RM0300,195.0,All patients,The Children's Hospital at Westmead,H,Hospital,NSW,False,False,-33.801554,150.991759,ED 2013-14,2013-07-01,2014-06-30,2013-2014
9,2137,H0014,MYH-RM0298,256.0,Subsequently admitted patients,The Children's Hospital at Westmead,H,Hospital,NSW,False,False,-33.801554,150.991759,ED 2014-15,2014-07-01,2015-06-30,2014-2015


In [154]:
# DELETE TABLE IF EXISTS
project_id = 'de-zoomcamp-project-377704'
table_id = 'median_wait_time'
subprocess.run(['bq', 'rm', '-f', f'{project_id}:{measure_name}.{table_id}'])

# WRITE TO BIGQUERY
description = """"
The length of time spent in the emergency department (ED) from arrival to departure, including the median time patients spent in the emergency department (time until half of the patients (50%) had departed from the emergency department).
Data are presented for all patients, patients treated and admitted to the same hospital, and patients discharged from emergency department (whether discharged, left at their own risk or referred to another hospital).
"""
df_join.write.format('bigquery') \
    .mode('overwrite') \
    .option('table', f'{measure_name}.{table_id}') \
    .option('description', description) \
    .save()

                                                                                