In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port','0'). \
config("spark.sql.warehouse.dir", f"/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
schema = 'patient_id integer,admission_date date,discharge_date date,diagnosis string,doctor_id integer,total_cost float'
hospital_df = spark.read.schema(schema). \
option("dateFormat","MM-dd-yyyy"). \
csv("/public/trendytech/datasets/hospital.csv",header="true")

In [3]:
hospital_df.show(5)

+----------+--------------+--------------+-------------+---------+----------+
|patient_id|admission_date|discharge_date|    diagnosis|doctor_id|total_cost|
+----------+--------------+--------------+-------------+---------+----------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|      101|    5000.0|
|         2|    2022-02-05|    2022-02-09| Appendicitis|      102|    7000.0|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|      103|    3500.0|
|         4|    2022-04-02|    2022-04-08| Heart Attack|      104|   15000.0|
|         5|    2022-05-05|    2022-05-07|    Influenza|      105|    2500.0|
+----------+--------------+--------------+-------------+---------+----------+
only showing top 5 rows



In [4]:
hospital_drop_doctor_id = hospital_df.drop("doctor_id") 

In [5]:
hospital_drop_doctor_id.show(5)

+----------+--------------+--------------+-------------+----------+
|patient_id|admission_date|discharge_date|    diagnosis|total_cost|
+----------+--------------+--------------+-------------+----------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|    5000.0|
|         2|    2022-02-05|    2022-02-09| Appendicitis|    7000.0|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|    3500.0|
|         4|    2022-04-02|    2022-04-08| Heart Attack|   15000.0|
|         5|    2022-05-05|    2022-05-07|    Influenza|    2500.0|
+----------+--------------+--------------+-------------+----------+
only showing top 5 rows



In [6]:
hospital_col_rename = hospital_drop_doctor_id.withColumnRenamed("total_cost","hospital_bill")

In [7]:
hospital_col_rename.show(5)

+----------+--------------+--------------+-------------+-------------+
|patient_id|admission_date|discharge_date|    diagnosis|hospital_bill|
+----------+--------------+--------------+-------------+-------------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|       5000.0|
|         2|    2022-02-05|    2022-02-09| Appendicitis|       7000.0|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|       3500.0|
|         4|    2022-04-02|    2022-04-08| Heart Attack|      15000.0|
|         5|    2022-05-05|    2022-05-07|    Influenza|       2500.0|
+----------+--------------+--------------+-------------+-------------+
only showing top 5 rows



In [8]:
from pyspark.sql.functions import expr
hospital_col_added = hospital_col_rename.withColumn("duration_of_stay",expr("datediff(discharge_date,admission_date)"))

In [9]:
hospital_col_added.show(5)

+----------+--------------+--------------+-------------+-------------+----------------+
|patient_id|admission_date|discharge_date|    diagnosis|hospital_bill|duration_of_stay|
+----------+--------------+--------------+-------------+-------------+----------------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|       5000.0|               9|
|         2|    2022-02-05|    2022-02-09| Appendicitis|       7000.0|               4|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|       3500.0|               6|
|         4|    2022-04-02|    2022-04-08| Heart Attack|      15000.0|               6|
|         5|    2022-05-05|    2022-05-07|    Influenza|       2500.0|               2|
+----------+--------------+--------------+-------------+-------------+----------------+
only showing top 5 rows



In [14]:
hospital_bill_adjusted = hospital_col_added.withColumn("adjusted_total_cost",expr("CASE WHEN diagnosis LIKE 'Heart Attack' THEN hospital_bill*1.5 WHEN diagnosis LIKE 'Appendicitis' THEN hospital_bill*1.2 ELSE hospital_bill END"))

In [15]:
hospital_bill_adjusted.show(5)

+----------+--------------+--------------+-------------+-------------+----------------+-------------------+
|patient_id|admission_date|discharge_date|    diagnosis|hospital_bill|duration_of_stay|adjusted_total_cost|
+----------+--------------+--------------+-------------+-------------+----------------+-------------------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|       5000.0|               9|             5000.0|
|         2|    2022-02-05|    2022-02-09| Appendicitis|       7000.0|               4|             8400.0|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|       3500.0|               6|             3500.0|
|         4|    2022-04-02|    2022-04-08| Heart Attack|      15000.0|               6|            22500.0|
|         5|    2022-05-05|    2022-05-07|    Influenza|       2500.0|               2|             2500.0|
+----------+--------------+--------------+-------------+-------------+----------------+-------------------+
only showing top 5 rows



In [17]:
hospital_bill_adjusted.select("patient_id","diagnosis","hospital_bill")

patient_id,diagnosis,hospital_bill
1,Pneumonia,5000.0
2,Appendicitis,7000.0
3,Fractured Arm,3500.0
4,Heart Attack,15000.0
5,Influenza,2500.0
6,Appendicitis,8000.0
7,Pneumonia,5500.0
8,Heart Attack,20000.0
9,Fractured Leg,6000.0
10,Appendicitis,7500.0
