In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
 builder. \
 config('spark.ui.port', '0'). \
 config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
 enableHiveSupport(). \
 master('yarn'). \
 getOrCreate()

In [2]:
spark

#### Question 1

#### Create a DataFrame using the sample data and schema

In [9]:
data = [("Spring",12.3),
("Summer",10.5),
("Autumn",8.2),
("Winter",15.1)]

In [10]:
df = spark.createDataFrame(data,schema=["season","windspeed"])

In [11]:
df.printSchema()

root
 |-- season: string (nullable = true)
 |-- windspeed: double (nullable = true)



In [12]:
df.show()

+------+---------+
|season|windspeed|
+------+---------+
|Spring|     12.3|
|Summer|     10.5|
|Autumn|      8.2|
|Winter|     15.1|
+------+---------+



#### Question 2

In [13]:
from pyspark.sql.types import *

In [14]:
schema = StructType([
StructField("library_name", StringType()),
StructField("location", StringType()),
StructField("books", ArrayType(
StructType([
StructField("book_id", StringType()),
StructField("book_name", StringType()),
StructField("author", StringType()),
StructField("copies_available", IntegerType())
])
)),
StructField("members", ArrayType(
StructType([
StructField("member_id", StringType()),
StructField("member_name", StringType()),
StructField("age", IntegerType()),
StructField("books_borrowed", ArrayType(StringType()))
])
))
])

In [15]:
library_df = spark.read.schema(schema).json("/public/trendytech/datasets/library_data.json")

In [16]:
library_df.printSchema()

root
 |-- library_name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- books: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- book_id: string (nullable = true)
 |    |    |-- book_name: string (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- copies_available: integer (nullable = true)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- member_id: string (nullable = true)
 |    |    |-- member_name: string (nullable = true)
 |    |    |-- age: integer (nullable = true)
 |    |    |-- books_borrowed: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)



In [18]:
library_df.show()

+-----------------+-----------+--------------------+--------------------+
|     library_name|   location|               books|             members|
+-----------------+-----------+--------------------+--------------------+
|  Central Library|City Center|[[B001, The Great...|[[M001, John Smit...|
|Community Library|     Suburb|[[B003, 1984, Geo...|[[M003, Michael B...|
+-----------------+-----------+--------------------+--------------------+



#### Question 3

In [7]:
train_df=spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/public/trendytech/datasets/train.csv")

In [8]:
train_df.printSchema()

root
 |-- train_number: integer (nullable = true)
 |-- train_name: string (nullable = true)
 |-- seats_available: integer (nullable = true)
 |-- passenger_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- ticket_number: string (nullable = true)
 |-- seat_number: string (nullable = true)



In [9]:
train_df.show()

+------------+----------+---------------+--------------+---+-------------+-----------+
|train_number|train_name|seats_available|passenger_name|age|ticket_number|seat_number|
+------------+----------+---------------+--------------+---+-------------+-----------+
|         123|   Express|            100|          John| 25|         T123|         A1|
|         123|   Express|            100|          Emma| 30|         T124|         B2|
|         456| Superfast|            150|       Michael| 35|         T125|         C3|
|         456| Superfast|            150|        Sophia| 40|         T126|         D4|
|         789|     Local|             50|       William| 28|         T127|         E5|
|         789|     Local|             50|        Sophia| 32|         T128|         F6|
|         789|     Local|             50|        Oliver| 45|         T129|         G7|
+------------+----------+---------------+--------------+---+-------------+-----------+



In [10]:
dropped_df = train_df.drop("passenger_name","age")

In [11]:
dropped_df.show()

+------------+----------+---------------+-------------+-----------+
|train_number|train_name|seats_available|ticket_number|seat_number|
+------------+----------+---------------+-------------+-----------+
|         123|   Express|            100|         T123|         A1|
|         123|   Express|            100|         T124|         B2|
|         456| Superfast|            150|         T125|         C3|
|         456| Superfast|            150|         T126|         D4|
|         789|     Local|             50|         T127|         E5|
|         789|     Local|             50|         T128|         F6|
|         789|     Local|             50|         T129|         G7|
+------------+----------+---------------+-------------+-----------+



In [15]:
drop_df = dropped_df.dropDuplicates(["train_number", "ticket_number"])

In [16]:
num_rows = drop_df.count()

In [17]:
print("Number of rows after removing duplicates:", num_rows)

Number of rows after removing duplicates: 7


In [18]:
distinct_departments = df.select("train_name").distinct()
num_departments = distinct_departments.count()
print("Number of unique train names:", num_departments)

Number of unique train names: 3


#### Question 4

In [19]:
schema="store_id integer,product string,quantity integer,revenue double"
df_permissive =spark.read.schema(schema).option("mode","permissive").json("/public/trendytech/datasets/sales_data.json")
num_records_permissive = df_permissive.count()
print("Number of records read:", num_records_permissive)

Number of records read: 22


In [21]:
df_dropmalformed = spark.read.option("mode","dropmalformed").schema(schema).json("/public/trendytech/datasets/sales_data.json")
df_dropmalformed.show()
num_records_dropmalformed = df_dropmalformed.count()
num_corrupt_records_dropmalformed = num_records_permissive - num_records_dropmalformed
print("Number of records read:", num_records_dropmalformed)
print("Number of dropped malformed records:",num_corrupt_records_dropmalformed)

+--------+----------+--------+-------+
|store_id|   product|quantity|revenue|
+--------+----------+--------+-------+
|       1|     Apple|      10|  100.0|
|       2|    Banana|      15|   75.0|
|       3|    Orange|      12|   90.0|
|       4|     Mango|       8|  120.0|
|       5|     Grape|      20|  150.0|
|       6|Watermelon|       5|   50.0|
|       7|Strawberry|      18|  108.0|
|       8| Pineapple|      14|  140.0|
|       9|    Cherry|       7|  105.0|
|      10|      Pear|       9|   81.0|
|      11| Blueberry|      11|   88.0|
|      12|      Kiwi|      16|  128.0|
|      13|     Peach|      13|   91.0|
|      14|      Plum|       6|   54.0|
|      15|     Lemon|      10|   70.0|
|      16| Raspberry|      17|  136.0|
|      17|   Coconut|       4|   80.0|
|      18|   Avocado|      11|   99.0|
|      19|Blackberry|       8|   64.0|
+--------+----------+--------+-------+

Number of records read: 21
Number of dropped malformed records: 1


In [None]:
df_failfast =spark.read.option("mode","failfast").schema(schema).json("/user/itv005357/sales_data.json")
df_failfast.show

#### Question 5

In [35]:
schema = "patient_id integer,admission_date date,discharge_date date,diagnosis string,doctor_id integer,total_cost float"

In [36]:
hosp_df=spark.read \
.format("csv") \
.option("header","true") \
.schema(schema) \
.option("dateFormat","MM-dd-yyyy") \
.load("/public/trendytech/datasets/hospital.csv")

In [40]:
hosp_df.show(5)

+----------+--------------+--------------+-------------+---------+----------+
|patient_id|admission_date|discharge_date|    diagnosis|doctor_id|total_cost|
+----------+--------------+--------------+-------------+---------+----------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|      101|    5000.0|
|         2|    2022-02-05|    2022-02-09| Appendicitis|      102|    7000.0|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|      103|    3500.0|
|         4|    2022-04-02|    2022-04-08| Heart Attack|      104|   15000.0|
|         5|    2022-05-05|    2022-05-07|    Influenza|      105|    2500.0|
+----------+--------------+--------------+-------------+---------+----------+
only showing top 5 rows



In [38]:
hospital_df = hosp_df.drop("doctor_id")
hospital_df.show(5)

+----------+--------------+--------------+-------------+----------+
|patient_id|admission_date|discharge_date|    diagnosis|total_cost|
+----------+--------------+--------------+-------------+----------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|    5000.0|
|         2|    2022-02-05|    2022-02-09| Appendicitis|    7000.0|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|    3500.0|
|         4|    2022-04-02|    2022-04-08| Heart Attack|   15000.0|
|         5|    2022-05-05|    2022-05-07|    Influenza|    2500.0|
+----------+--------------+--------------+-------------+----------+
only showing top 5 rows



In [42]:
hospital_new_df = hospital_df.withColumnRenamed("total_cost","hospital_bill")
hospital_new_df.show(5)

+----------+--------------+--------------+-------------+-------------+
|patient_id|admission_date|discharge_date|    diagnosis|hospital_bill|
+----------+--------------+--------------+-------------+-------------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|       5000.0|
|         2|    2022-02-05|    2022-02-09| Appendicitis|       7000.0|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|       3500.0|
|         4|    2022-04-02|    2022-04-08| Heart Attack|      15000.0|
|         5|    2022-05-05|    2022-05-07|    Influenza|       2500.0|
+----------+--------------+--------------+-------------+-------------+
only showing top 5 rows



In [48]:
from pyspark.sql.functions import expr

In [49]:
hospital_expr_df = hospital_new_df.withColumn("duration_of_stay",expr("datediff(discharge_date, admission_date)"))
hospital_expr_df.show(5)

+----------+--------------+--------------+-------------+-------------+----------------+
|patient_id|admission_date|discharge_date|    diagnosis|hospital_bill|duration_of_stay|
+----------+--------------+--------------+-------------+-------------+----------------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|       5000.0|               9|
|         2|    2022-02-05|    2022-02-09| Appendicitis|       7000.0|               4|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|       3500.0|               6|
|         4|    2022-04-02|    2022-04-08| Heart Attack|      15000.0|               6|
|         5|    2022-05-05|    2022-05-07|    Influenza|       2500.0|               2|
+----------+--------------+--------------+-------------+-------------+----------------+
only showing top 5 rows



In [51]:
hospital_price_df = hospital_expr_df.withColumn("adjusted_total_cost",expr("CASE WHEN diagnosis LIKE 'Heart Attack' THEN hospital_bill * 1.5 WHEN diagnosis LIKE 'Appendicitis' THEN hospital_bill * 1.2 ELSE hospital_bill END"))

In [52]:
hospital_price_df.show(5)

+----------+--------------+--------------+-------------+-------------+----------------+-------------------+
|patient_id|admission_date|discharge_date|    diagnosis|hospital_bill|duration_of_stay|adjusted_total_cost|
+----------+--------------+--------------+-------------+-------------+----------------+-------------------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|       5000.0|               9|             5000.0|
|         2|    2022-02-05|    2022-02-09| Appendicitis|       7000.0|               4|             8400.0|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|       3500.0|               6|             3500.0|
|         4|    2022-04-02|    2022-04-08| Heart Attack|      15000.0|               6|            22500.0|
|         5|    2022-05-05|    2022-05-07|    Influenza|       2500.0|               2|             2500.0|
+----------+--------------+--------------+-------------+-------------+----------------+-------------------+
only showing top 5 rows



In [53]:
hospital_final_df = hospital_price_df.select("patient_id", "diagnosis","hospital_bill", "adjusted_total_cost")
hospital_final_df.show(5)

+----------+-------------+-------------+-------------------+
|patient_id|    diagnosis|hospital_bill|adjusted_total_cost|
+----------+-------------+-------------+-------------------+
|         1|    Pneumonia|       5000.0|             5000.0|
|         2| Appendicitis|       7000.0|             8400.0|
|         3|Fractured Arm|       3500.0|             3500.0|
|         4| Heart Attack|      15000.0|            22500.0|
|         5|    Influenza|       2500.0|             2500.0|
+----------+-------------+-------------+-------------------+
only showing top 5 rows

