In [1]:
from pyspark.sql import SparkSession

In [2]:
# Initialize SparkSession
spark = (SparkSession.builder
        .appName("cs744")
        .master("spark://master:7077")
        .config("spark.driver.memory", "20g")
        .config("spark.executor.memory", "20g")
        .config("spark.executor.cores", "4")
        .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/01/30 21:21:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Reading Data

In [3]:
holidays = spark.read.csv("hdfs://nn:9000/holidays2.csv", inferSchema=True, header=True)

                                                                                

In [4]:
holidays.limit(3).toPandas()

                                                                                

Unnamed: 0,date,holiday
0,01/01/2013,New Year's Day
1,01/01/2014,New Year's Day
2,01/01/2015,New Year's Day


In [5]:
calls = spark.read.csv("hdfs://nn:9000/sf.csv", inferSchema=True, header=True)

                                                                                

In [6]:
calls.limit(3).toPandas()

25/01/30 21:22:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,Call Number,Unit ID,Incident Number,Call Type,Call Date,Watch Date,Received DtTm,Entry DtTm,Dispatch DtTm,Response DtTm,...,Call Type Group,Number of Alarms,Unit Type,Unit sequence in call dispatch,Fire Prevention District,Supervisor District,Neighborhooods - Analysis Boundaries,RowID,case_location,Analysis Neighborhoods
0,221210313,E36,22054955,Outside Fire,05/01/2022,04/30/2022,05/01/2022 02:58:25 AM,05/01/2022 02:59:15 AM,05/01/2022 02:59:25 AM,05/01/2022 03:01:06 AM,...,Fire,1,ENGINE,1,2,5,Hayes Valley,221210313-E36,POINT (-122.42316555403964 37.77781524520032),9
1,220190150,E29,22008871,Alarms,01/19/2022,01/18/2022,01/19/2022 01:42:12 AM,01/19/2022 01:44:13 AM,01/19/2022 01:44:28 AM,01/19/2022 01:46:47 AM,...,Alarm,1,ENGINE,1,3,10,Potrero Hill,220190150-E29,POINT (-122.39469970274361 37.76460987856451),26
2,211233271,T07,21053032,Alarms,05/03/2021,05/03/2021,05/03/2021 09:28:12 PM,05/03/2021 09:28:12 PM,05/03/2021 09:28:17 PM,05/03/2021 09:29:10 PM,...,Alarm,1,TRUCK,2,2,9,Mission,211233271-T07,POINT (-122.42057572093252 37.76418194637148),20


# Filtering

In [7]:
from pyspark.sql.functions import col

In [8]:
holidays_14 = holidays.filter(col("date").contains("2014"))

In [9]:
holidays_14_birth = holidays_14.filter(col("holiday").contains("Birthday"))

In [10]:
holidays_14_birth.count()

2

In [11]:
holidays_14_birth.show()

+----------+--------------------+
|      date|             holiday|
+----------+--------------------+
|01/20/2014|Birthday of Marti...|
|02/17/2014|Washington's Birt...|
+----------+--------------------+



In [12]:
from pyspark.sql.functions import expr

In [13]:
holidays.filter(expr("holiday like '%Birthday%'")).count()

24

In [14]:
holidays.filter(expr("holiday like '%Birthday%' AND date like '%2014'")).count()

2

# Create New Column

In [15]:
from pyspark.sql.functions import lower

In [16]:
holidays_14 = holidays_14.withColumn("holiday_lower", lower(col("holiday")))

In [17]:
holidays_14

DataFrame[date: string, holiday: string, holiday_lower: string]

In [18]:
holidays_14_birth = holidays_14.filter(col("holiday_lower").contains("birthday"))

In [19]:
holidays_14_birth.count()

2

# Select

In [20]:
calls.select("Call Date")

DataFrame[Call Date: string]

In [21]:
calls.select("Call Date", "Call Type").limit(3).toPandas()

Unnamed: 0,Call Date,Call Type
0,05/01/2022,Outside Fire
1,01/19/2022,Alarms
2,05/03/2021,Alarms


# Grouping

In [22]:
# SELECT SUM(`Number of Alarms`) as "Total Alarms"
# FROM calls
# GROUP BY `Call Type`
# WHERE `Call Date` = '01/01/2014'

In [23]:
calls_date = calls.filter(expr("`Call Date` = '01/01/2014'"))

In [24]:
calls_date.limit(3).toPandas()

                                                                                

Unnamed: 0,Call Number,Unit ID,Incident Number,Call Type,Call Date,Watch Date,Received DtTm,Entry DtTm,Dispatch DtTm,Response DtTm,...,Call Type Group,Number of Alarms,Unit Type,Unit sequence in call dispatch,Fire Prevention District,Supervisor District,Neighborhooods - Analysis Boundaries,RowID,case_location,Analysis Neighborhoods
0,140010567,E36,14000479,Structure Fire,01/01/2014,01/01/2014,01/01/2014 09:32:54 PM,01/01/2014 09:33:31 PM,01/01/2014 09:36:27 PM,01/01/2014 09:37:00 PM,...,Fire,1,ENGINE,12,4,5,Western Addition,140010567-E36,POINT (-122.427815582933 37.784448089516),39
1,140010061,KM12,14000046,Structure Fire,01/01/2014,12/31/2013,01/01/2014 12:43:09 AM,01/01/2014 12:44:07 AM,01/01/2014 12:47:31 AM,01/01/2014 12:47:31 AM,...,Alarm,1,PRIVATE,7,4,2,Marina,140010061-KM12,POINT (-122.429500504516 37.801174806033),13
2,140010523,B08,14000439,Alarms,01/01/2014,01/01/2014,01/01/2014 06:47:53 PM,01/01/2014 06:50:10 PM,01/01/2014 06:50:17 PM,01/01/2014 06:50:59 PM,...,Alarm,1,CHIEF,2,8,7,Inner Sunset,140010523-B08,POINT (-122.47140400613 37.761010743685),14


In [25]:
from pyspark.sql.functions import sum

In [26]:
calls_grouped = calls_date.groupBy("Call Type").agg(sum("Number of Alarms").alias("Num_Alarms"))

In [27]:
calls_grouped.show()



+--------------------+----------+
|           Call Type|Num_Alarms|
+--------------------+----------+
|      Structure Fire|       164|
|    Medical Incident|       818|
|              Alarms|        82|
|Elevator / Escala...|         1|
|Citizen Assist / ...|        10|
|        Vehicle Fire|         2|
|               Other|        25|
|        Outside Fire|        35|
|   Traffic Collision|        54|
|Gas Leak (Natural...|         7|
|        Water Rescue|        13|
|          Fuel Spill|         5|
|Smoke Investigati...|         2|
+--------------------+----------+



                                                                                

# Caching

In [28]:
calls_high = calls_grouped.filter(expr("Num_Alarms > 100"))

In [29]:
%%time
calls_high.count()



CPU times: user 21.5 ms, sys: 11.3 ms, total: 32.7 ms
Wall time: 7.09 s


                                                                                

2

In [30]:
calls_grouped.cache()

DataFrame[Call Type: string, Num_Alarms: bigint]

In [31]:
%%time
calls_high.count()



CPU times: user 16.7 ms, sys: 9.38 ms, total: 26.1 ms
Wall time: 5.58 s


                                                                                

2

In [32]:
%%time
calls_high.count()

CPU times: user 0 ns, sys: 3.28 ms, total: 3.28 ms
Wall time: 610 ms


2

In [33]:
calls_grouped.unpersist()

DataFrame[Call Type: string, Num_Alarms: bigint]

# Join

In [34]:
joined_df = holidays.join(calls, holidays['date'] == calls['Call Date'], "inner")

In [35]:
joined_df.limit(3).toPandas()

Unnamed: 0,date,holiday,Call Number,Unit ID,Incident Number,Call Type,Call Date,Watch Date,Received DtTm,Entry DtTm,...,Call Type Group,Number of Alarms,Unit Type,Unit sequence in call dispatch,Fire Prevention District,Supervisor District,Neighborhooods - Analysis Boundaries,RowID,case_location,Analysis Neighborhoods
0,01/17/2022,"Birthday of Martin Luther King, Jr.",220173023,67,22008351,Medical Incident,01/17/2022,01/17/2022,01/17/2022 09:23:31 PM,01/17/2022 09:24:26 PM,...,Potentially Life-Threatening,1,MEDIC,2,10,10,Bayview Hunters Point,220173023-67,POINT (-122.39096789028375 37.73401387414893),1
1,01/17/2022,"Birthday of Martin Luther King, Jr.",220170811,B02,22008104,Alarms,01/17/2022,01/17/2022,01/17/2022 09:28:15 AM,01/17/2022 09:29:17 AM,...,Alarm,1,CHIEF,2,2,9,Mission,220170811-B02,POINT (-122.4207461905871 37.765041354178884),20
2,07/05/2021,Independence Day,211862605,E07,21079789,Medical Incident,07/05/2021,07/05/2021,07/05/2021 05:19:43 PM,07/05/2021 05:24:46 PM,...,Non Life-threatening,1,ENGINE,1,2,9,Mission,211862605-E07,POINT (-122.42173999514011 37.76411273839148),20


In [36]:
birth_df = joined_df.filter(expr("lower(holiday) like '%birthday%'"))

In [37]:
birth_df = birth_df.agg(sum("Number of Alarms"))

In [38]:
birth_df.show()



+---------------------+
|sum(Number of Alarms)|
+---------------------+
|                18646|
+---------------------+



                                                                                

# Partition

In [63]:
# df.unpersist()

DataFrame[Call Number: int, Unit ID: string, Incident Number: int, Call Type: string, Call Date: string, Watch Date: string, Received DtTm: string, Entry DtTm: string, Dispatch DtTm: string, Response DtTm: string, On Scene DtTm: string, Transport DtTm: string, Hospital DtTm: string, Call Final Disposition: string, Available DtTm: string, Address: string, City: string, Zipcode of Incident: int, Battalion: string, Station Area: string, Box: string, Original Priority: string, Priority: string, Final Priority: int, ALS Unit: boolean, Call Type Group: string, Number of Alarms: int, Unit Type: string, Unit sequence in call dispatch: int, Fire Prevention District: string, Supervisor District: string, Neighborhooods - Analysis Boundaries: string, RowID: string, case_location: string, Analysis Neighborhoods: int]

In [64]:
calls.count()

                                                                                

6016056

In [65]:
df = calls.sample(0.01)

In [66]:
%%time
df.count()



CPU times: user 6.07 ms, sys: 18.7 ms, total: 24.8 ms
Wall time: 3.9 s


                                                                                

60590

## Only Caching

In [67]:
df.cache()

DataFrame[Call Number: int, Unit ID: string, Incident Number: int, Call Type: string, Call Date: string, Watch Date: string, Received DtTm: string, Entry DtTm: string, Dispatch DtTm: string, Response DtTm: string, On Scene DtTm: string, Transport DtTm: string, Hospital DtTm: string, Call Final Disposition: string, Available DtTm: string, Address: string, City: string, Zipcode of Incident: int, Battalion: string, Station Area: string, Box: string, Original Priority: string, Priority: string, Final Priority: int, ALS Unit: boolean, Call Type Group: string, Number of Alarms: int, Unit Type: string, Unit sequence in call dispatch: int, Fire Prevention District: string, Supervisor District: string, Neighborhooods - Analysis Boundaries: string, RowID: string, case_location: string, Analysis Neighborhoods: int]

In [68]:
%%time
df.count()



CPU times: user 12.7 ms, sys: 6.77 ms, total: 19.5 ms
Wall time: 8.84 s


                                                                                

60590

In [74]:
%%timeit
df.count()

125 ms ± 15.4 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


## Repartition

In [75]:
df = calls.sample(0.01).repartition(1)

In [77]:
%%timeit
df.count()



3.97 s ± 1.03 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


                                                                                

## Repartition + Caching

In [78]:
df.cache()

DataFrame[Call Number: int, Unit ID: string, Incident Number: int, Call Type: string, Call Date: string, Watch Date: string, Received DtTm: string, Entry DtTm: string, Dispatch DtTm: string, Response DtTm: string, On Scene DtTm: string, Transport DtTm: string, Hospital DtTm: string, Call Final Disposition: string, Available DtTm: string, Address: string, City: string, Zipcode of Incident: int, Battalion: string, Station Area: string, Box: string, Original Priority: string, Priority: string, Final Priority: int, ALS Unit: boolean, Call Type Group: string, Number of Alarms: int, Unit Type: string, Unit sequence in call dispatch: int, Fire Prevention District: string, Supervisor District: string, Neighborhooods - Analysis Boundaries: string, RowID: string, case_location: string, Analysis Neighborhoods: int]

In [79]:
%%time
df.count()

[Stage 384:>                                                        (0 + 1) / 1]

CPU times: user 5.81 ms, sys: 21.1 ms, total: 27 ms
Wall time: 9.66 s


                                                                                

59691

In [80]:
%%timeit
df.count()

45.1 ms ± 3.83 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [81]:
df.rdd.getNumPartitions()

1

# Explain

```
SELECT SUM(`Number of Alarms`)
FROM holidays JOIN calls on holidays.date = calls.`Call Date`
WHERE lower(holiday) like '%birthday%'
```

In [82]:
joined_df = holidays.join(calls, holidays['date'] == calls['Call Date'], "inner")
birth_df = joined_df.filter(expr("lower(holiday) like '%birthday%'"))
birth_df = birth_df.agg(sum("Number of Alarms"))

In [83]:
birth_df.show()



+---------------------+
|sum(Number of Alarms)|
+---------------------+
|                18646|
+---------------------+



                                                                                

In [84]:
birth_df.explain("formatted")

== Physical Plan ==
AdaptiveSparkPlan (12)
+- HashAggregate (11)
   +- Exchange (10)
      +- HashAggregate (9)
         +- Project (8)
            +- BroadcastHashJoin Inner BuildLeft (7)
               :- BroadcastExchange (4)
               :  +- Project (3)
               :     +- Filter (2)
               :        +- Scan csv  (1)
               +- Filter (6)
                  +- Scan csv  (5)


(1) Scan csv 
Output [2]: [date#17, holiday#18]
Batched: false
Location: InMemoryFileIndex [hdfs://nn:9000/holidays2.csv]
PushedFilters: [IsNotNull(holiday), IsNotNull(date)]
ReadSchema: struct<date:string,holiday:string>

(2) Filter
Input [2]: [date#17, holiday#18]
Condition : ((isnotnull(holiday#18) AND Contains(lower(holiday#18), birthday)) AND isnotnull(date#17))

(3) Project
Output [1]: [date#17]
Input [2]: [date#17, holiday#18]

(4) BroadcastExchange
Input [1]: [date#17]
Arguments: HashedRelationBroadcastMode(List(input[0, string, true]),false), [plan_id=5949]

(5) Scan csv 
Output [