### Install Pyspark and Java jdk as provided in the installation folder of github
https://github.com/yubarajkafle/Course-materials-for-BDA601-Big-Data-Analytics-Torrens-University-/tree/main/Installation/Pyspark

### Copy data from below link to the same folder where this lab is located
https://www.kaggle.com/datasets/isadoraamorim/trafficcrasheschicago?resource=download

In [1]:
# PySpark: Spark's API for Python.
!pip install pyspark



In [2]:
#import necessary packages 
from pyspark.sql import SparkSession

In [3]:
import os
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-17"
os.environ["PATH"] = os.environ["JAVA_HOME"] + r"\bin;" + os.environ["PATH"]

### What is SparkSession?
Spark Session: A unified entry point for DataFrame and Dataset APIs.It's object "spark" is default available in spark-shell and it can be created programmatically using SparkSession builder pattern.

In Apache Spark 2.x and later, the `SparkSession` is the entry point to any Spark functionality. When you want to run a Spark application, you first need to create a SparkSession. 

The `SparkSession.builder().getOrCreate()` method is a way to ensure that a SparkSession is created only once in an application.

`SparkSession.builder()`: This returns a SparkSession.Builder object, which is a builder for a SparkSession. With the builder, you can configure options for the SparkSession, such as appName, master, and various Spark configurations using the config method.

`getOrCreate()`: When called on a SparkSession.Builder object, this method: Retrieves the existing SparkSession if one already exists.
Creates a new SparkSession if none exists.

In [5]:
#Create Spark session
spark = SparkSession.builder.getOrCreate()

### Create a DataFrame from a CSV file
`read`:
This is a method associated with SparkSession and it returns a DataFrameReader that can be used to read data. The read method provides functionality to read data from various sources into a Spark DataFrame.

`option('header', 'true')`:
The option method allows you to specify options when reading data. In this case, the option being set is 'header' with the value 'true'. This means that the first row of the CSV file (Traffic_Crashes_-_Crashes.csv) is considered as a header and will be used to name the columns of the DataFrame.

If this option wasn't set (or set to 'false'), the CSV file would be read without considering the first row as a header, and default column names would be assigned (like _c0, _c1, etc.).

**Q1: Load the data from the csv files into DataFrames.**

In [6]:
# Load the data from the csv files into DataFrames.
crashes = spark.read.option('header', 'true').csv('Traffic_Crashes_-_Crashes.csv')
vehicles = spark.read.option('header' , 'true').csv('Traffic_Crashes_-_Vehicles.csv')
peoples = spark.read.option( 'header', 'true').csv('Traffic_Crashes_-_People.csv')

In [7]:
# let's see what is the type of crashes
print(type(crashes))

<class 'pyspark.sql.classic.dataframe.DataFrame'>


In [8]:
#let's see what is the data type of each DataFrame 
crashes.dtypes

[('RD_NO', 'string'),
 ('CRASH_DATE_EST_I', 'string'),
 ('CRASH_DATE', 'string'),
 ('POSTED_SPEED_LIMIT', 'string'),
 ('TRAFFIC_CONTROL_DEVICE', 'string'),
 ('DEVICE_CONDITION', 'string'),
 ('WEATHER_CONDITION', 'string'),
 ('LIGHTING_CONDITION', 'string'),
 ('FIRST_CRASH_TYPE', 'string'),
 ('TRAFFICWAY_TYPE', 'string'),
 ('LANE_CNT', 'string'),
 ('ALIGNMENT', 'string'),
 ('ROADWAY_SURFACE_COND', 'string'),
 ('ROAD_DEFECT', 'string'),
 ('REPORT_TYPE', 'string'),
 ('CRASH_TYPE', 'string'),
 ('INTERSECTION_RELATED_I', 'string'),
 ('NOT_RIGHT_OF_WAY_I', 'string'),
 ('HIT_AND_RUN_I', 'string'),
 ('DAMAGE', 'string'),
 ('DATE_POLICE_NOTIFIED', 'string'),
 ('PRIM_CONTRIBUTORY_CAUSE', 'string'),
 ('SEC_CONTRIBUTORY_CAUSE', 'string'),
 ('STREET_NO', 'string'),
 ('STREET_DIRECTION', 'string'),
 ('STREET_NAME', 'string'),
 ('BEAT_OF_OCCURRENCE', 'string'),
 ('PHOTOS_TAKEN_I', 'string'),
 ('STATEMENTS_TAKEN_I', 'string'),
 ('DOORING_I', 'string'),
 ('WORK_ZONE_I', 'string'),
 ('WORK_ZONE_TYPE', '

In [9]:
vehicles.dtypes

[('CRASH_UNIT_ID', 'string'),
 ('RD_NO', 'string'),
 ('CRASH_DATE', 'string'),
 ('UNIT_NO', 'string'),
 ('UNIT_TYPE', 'string'),
 ('NUM_PASSENGERS', 'string'),
 ('VEHICLE_ID', 'string'),
 ('CMRC_VEH_I', 'string'),
 ('MAKE', 'string'),
 ('MODEL', 'string'),
 ('LIC_PLATE_STATE', 'string'),
 ('VEHICLE_YEAR', 'string'),
 ('VEHICLE_DEFECT', 'string'),
 ('VEHICLE_TYPE', 'string'),
 ('VEHICLE_USE', 'string'),
 ('TRAVEL_DIRECTION', 'string'),
 ('MANEUVER', 'string'),
 ('TOWED_I', 'string'),
 ('FIRE_I', 'string'),
 ('OCCUPANT_CNT', 'string'),
 ('EXCEED_SPEED_LIMIT_I', 'string'),
 ('TOWED_BY', 'string'),
 ('TOWED_TO', 'string'),
 ('AREA_00_I', 'string'),
 ('AREA_01_I', 'string'),
 ('AREA_02_I', 'string'),
 ('AREA_03_I', 'string'),
 ('AREA_04_I', 'string'),
 ('AREA_05_I', 'string'),
 ('AREA_06_I', 'string'),
 ('AREA_07_I', 'string'),
 ('AREA_08_I', 'string'),
 ('AREA_09_I', 'string'),
 ('AREA_10_I', 'string'),
 ('AREA_11_I', 'string'),
 ('AREA_12_I', 'string'),
 ('AREA_99_I', 'string'),
 ('FIRST_

In [10]:
peoples.dtypes

[('PERSON_ID', 'string'),
 ('PERSON_TYPE', 'string'),
 ('RD_NO', 'string'),
 ('VEHICLE_ID', 'string'),
 ('CRASH_DATE', 'string'),
 ('SEAT_NO', 'string'),
 ('CITY', 'string'),
 ('STATE', 'string'),
 ('ZIPCODE', 'string'),
 ('SEX', 'string'),
 ('AGE', 'string'),
 ('DRIVERS_LICENSE_STATE', 'string'),
 ('DRIVERS_LICENSE_CLASS', 'string'),
 ('SAFETY_EQUIPMENT', 'string'),
 ('AIRBAG_DEPLOYED', 'string'),
 ('EJECTION', 'string'),
 ('INJURY_CLASSIFICATION', 'string'),
 ('HOSPITAL', 'string'),
 ('EMS_AGENCY', 'string'),
 ('EMS_RUN_NO', 'string'),
 ('DRIVER_ACTION', 'string'),
 ('DRIVER_VISION', 'string'),
 ('PHYSICAL_CONDITION', 'string'),
 ('PEDPEDAL_ACTION', 'string'),
 ('PEDPEDAL_VISIBILITY', 'string'),
 ('PEDPEDAL_LOCATION', 'string'),
 ('BAC_RESULT', 'string'),
 ('BAC_RESULT VALUE', 'string'),
 ('CELL_PHONE_USE', 'string')]

In PySpark, the `pyspark.sql.types` module provides a collection of data types that you can use to specify the schema of a DataFrame. When you're working with data in Spark, sometimes you might need to explicitly define or cast data to a specific type. This is where these imports come into play.

In [11]:
from pyspark.sql. types import StringType 
from pyspark.sql. types import IntegerType

**Q2: Find the ratio of number of crashes where the person involved was using cell phone to that where the person was not using the cell phone.**

In [12]:
peoples. groupby(peoples.DRIVER_ACTION).count().orderBy("count").show(n=50,truncate=False)

+---------------------------------+------+
|DRIVER_ACTION                    |count |
+---------------------------------+------+
|LICENSE RESTRICTIONS             |14    |
|STOPPED SCHOOL BUS               |63    |
|TEXTING                          |222   |
|EMERGENCY VEHICLE ON CALL        |369   |
|EVADING POLICE VEHICLE           |744   |
|CELL PHONE USE OTHER THAN TEXTING|881   |
|WRONG WAY/SIDE                   |1531  |
|IMPROPER PARKING                 |1858  |
|DISREGARDED CONTROL DEVICES      |6503  |
|TOO FAST FOR CONDITIONS          |7349  |
|IMPROPER PASSING                 |10451 |
|IMPROPER TURN                    |12293 |
|IMPROPER LANE CHANGE             |13216 |
|IMPROPER BACKING                 |15535 |
|FOLLOWED TOO CLOSELY             |32437 |
|OTHER                            |36694 |
|FAILED TO YIELD                  |45429 |
|UNKNOWN                          |97801 |
|NULL                             |109742|
|NONE                             |171168|
+----------

#### In PySpark, when working with DataFrames, the `.show()` method is used to display the rows of the DataFrame in a tabular format, primarily for visual inspection during development or debugging.

`truncate=False`: Content in each cell of the table will be displayed in full, regardless of its length. This means that if you have very long content in some cells, the display might stretch out horizontally, making it harder to read, but ensuring you see the full content.

In [13]:
phone = peoples.groupby(peoples.DRIVER_ACTION).count().filter((peoples.DRIVER_ACTION == 'CELL PHONE USE OTHER THAN TEXTING') | (peoples.DRIVER_ACTION == 'TEXTING'))
phone.show(truncate=False)


+---------------------------------+-----+
|DRIVER_ACTION                    |count|
+---------------------------------+-----+
|CELL PHONE USE OTHER THAN TEXTING|881  |
|TEXTING                          |222  |
+---------------------------------+-----+



In [14]:
phone_crashes = phone.groupBy().sum('count').collect()[0][0]
print ("Crashes that occurs because of phone: ", phone_crashes)

Crashes that occurs because of phone:  1103


In [15]:
no_phone = peoples.groupby(peoples.DRIVER_ACTION).count().filter((peoples.DRIVER_ACTION != 'NONE') &(peoples.DRIVER_ACTION != 'UNKNOWN') & (peoples.DRIVER_ACTION != 'OTHER') & (peoples.DRIVER_ACTION != 'null') & (peoples.DRIVER_ACTION != 'CELL PHONE USE OTHER THAN TEXTING') &(peoples.DRIVER_ACTION != 'TEXTING'))
no_phone.show(truncate=False)
no_phone_crashes = no_phone.groupBy().sum('count').collect()[0][0]
print ("Crashes that occurs NOT because of phone: ", no_phone_crashes)

+---------------------------+-----+
|DRIVER_ACTION              |count|
+---------------------------+-----+
|LICENSE RESTRICTIONS       |14   |
|EVADING POLICE VEHICLE     |744  |
|FOLLOWED TOO CLOSELY       |32437|
|IMPROPER LANE CHANGE       |13216|
|IMPROPER PARKING           |1858 |
|TOO FAST FOR CONDITIONS    |7349 |
|DISREGARDED CONTROL DEVICES|6503 |
|IMPROPER TURN              |12293|
|IMPROPER BACKING           |15535|
|WRONG WAY/SIDE             |1531 |
|IMPROPER PASSING           |10451|
|STOPPED SCHOOL BUS         |63   |
|FAILED TO YIELD            |45429|
|EMERGENCY VEHICLE ON CALL  |369  |
+---------------------------+-----+

Crashes that occurs NOT because of phone:  147792


In [16]:
print ("Ratio of phone to non-phone crashes",phone_crashes,"/",no_phone_crashes, "=" , (100 * (phone_crashes / no_phone_crashes))) 

Ratio of phone to non-phone crashes 1103 / 147792 = 0.7463191512395799


**Q3: Find which three Age groups were involved with highest number of crashes.**

In [17]:
from pyspark.sql.functions import col,isnull

In [18]:
# To be able to do mathematicl comparison or operations the variavle type should be numeric (int, double, float,...)
# Here age is string which is text, we need to convert it to int first
peoples = peoples.withColumn("AGE", col("AGE").cast("integer"))
peoples.dtypes

[('PERSON_ID', 'string'),
 ('PERSON_TYPE', 'string'),
 ('RD_NO', 'string'),
 ('VEHICLE_ID', 'string'),
 ('CRASH_DATE', 'string'),
 ('SEAT_NO', 'string'),
 ('CITY', 'string'),
 ('STATE', 'string'),
 ('ZIPCODE', 'string'),
 ('SEX', 'string'),
 ('AGE', 'int'),
 ('DRIVERS_LICENSE_STATE', 'string'),
 ('DRIVERS_LICENSE_CLASS', 'string'),
 ('SAFETY_EQUIPMENT', 'string'),
 ('AIRBAG_DEPLOYED', 'string'),
 ('EJECTION', 'string'),
 ('INJURY_CLASSIFICATION', 'string'),
 ('HOSPITAL', 'string'),
 ('EMS_AGENCY', 'string'),
 ('EMS_RUN_NO', 'string'),
 ('DRIVER_ACTION', 'string'),
 ('DRIVER_VISION', 'string'),
 ('PHYSICAL_CONDITION', 'string'),
 ('PEDPEDAL_ACTION', 'string'),
 ('PEDPEDAL_VISIBILITY', 'string'),
 ('PEDPEDAL_LOCATION', 'string'),
 ('BAC_RESULT', 'string'),
 ('BAC_RESULT VALUE', 'string'),
 ('CELL_PHONE_USE', 'string')]

In [19]:
age_groups = peoples.filter(~isnull(col("AGE"))).groupBy("AGE").count().orderBy("count", ascending=False).limit(3)
age_groups.show()

+---+-----+
|AGE|count|
+---+-----+
| 25|11614|
| 27|11516|
| 26|11361|
+---+-----+



**Q4. Find which month of the year has the highest crashes.**

In [None]:
crashes.select("CRASH_DATE").show(n=100,truncate=False)

In [None]:
from pyspark.sql.functions import month, dayofweek, to_date, substring

In [None]:
crashes = crashes.withColumn("CRASH_DATE", to_date(substring(crashes["CRASH_DATE"], 1, 10), 'MM/dd/yyyy'))

crashes.dtypes

In [None]:
crashes.select("CRASH_DATE").show(10, truncate=False)

In [None]:
crashes_month = crashes.withColumn("Month", month(crashes["CRASH_DATE"])).groupBy("Month").count().orderBy("count", ascending=False).limit(1)
crashes_month.show()

**Q5. Find which day of the week has the least crashes.**

In the dayofweek function in PySpark, the days of the week are represented as integers from 1 (Sunday) to 7 (Saturday). <br>
1.Sunday <br>
2.Monday<br>
3.Tuesday<br>
4.Wednesday<br>
5.Thursday<br>
6.Friday<br>
7.Saturday<br>

In [None]:
crashes_day = crashes.withColumn("Day", dayofweek(crashes["CRASH_DATE"])).groupBy("day").count().orderBy("count", ascending=False).limit(1)
crashes_day.show()

# This is end of Lab 04