<a href="https://colab.research.google.com/github/saitzaw/apache-spark-colab/blob/main/Spark_03.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Simple data analysis using Apache Spark
- Read the Parquet file in Google Drive
- Analysis the data using Apache Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar -xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install pyarrow



In [70]:
import os
import findspark
from pyarrow import parquet
from pyspark.sql import SparkSession

In [71]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [72]:
findspark.init()
spark = SparkSession.builder\
                    .master("local")\
                    .appName("colab")\
                    .config("spark.ui.port", "4050")\
                    .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

### Data source 
https://data.cityofnewyork.us/Public-Safety/Motor-Vehicle-Collisions-Crashes/h9gi-nx95

### remark 
- change this csv file to parquet format to save file size

#### method
```
# This is formatted as code
df = pd.read_csv('path/file.csv', low_memory=False)
df.to_parquet('path/file.parquet')
```
then upload to Google Drive


In [73]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [75]:
data_file = '/content/gdrive/MyDrive/ColabDataset/MotorVehicleCollisionsCrashes.parquet'

In [76]:
df = spark.read.parquet(data_file)

In [77]:
df.head(5)

[Row(CRASH_DATE='04/14/2021', CRASH_TIME='5:32', BOROUGH=None, ZIP_CODE=None, LATITUDE=None, LONGITUDE=None, LOCATION=None, ON_STREET_NAME='BRONX WHITESTONE BRIDGE', CROSS_STREET_NAME=None, OFF_STREET_NAME=None, NUMBER_OF_PERSONS_INJURED=0.0, NUMBER_OF_PERSONS_KILLED=0.0, NUMBER_OF_PEDESTRIANS_INJURED=0, NUMBER_OF_PEDESTRIANS_KILLED=0, NUMBER_OF_CYCLIST_INJURED=0, NUMBER_OF_CYCLIST_KILLED=0, NUMBER_OF_MOTORIST_INJURED=0, NUMBER_OF_MOTORIST_KILLED=0, CONTRIBUTING_FACTOR_VEHICLE_1='Following Too Closely', CONTRIBUTING_FACTOR_VEHICLE_2='Unspecified', CONTRIBUTING_FACTOR_VEHICLE_3=None, CONTRIBUTING_FACTOR_VEHICLE_4=None, CONTRIBUTING_FACTOR_VEHICLE_5=None, COLLISION_ID=4407480, VEHICLE_TYPE_CODE_1='Sedan', VEHICLE_TYPE_CODE_2='Sedan', VEHICLE_TYPE_CODE_3=None, VEHICLE_TYPE_CODE_4=None, VEHICLE_TYPE_CODE_5=None),
 Row(CRASH_DATE='04/13/2021', CRASH_TIME='21:35', BOROUGH='BROOKLYN', ZIP_CODE=11217.0, LATITUDE=40.68358, LONGITUDE=-73.97617, LOCATION='(40.68358, -73.97617)', ON_STREET_NAME=No

In [78]:
df.show(5, truncate=True)

+----------+----------+--------+--------+--------+---------+--------------------+--------------------+-----------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+--------------------+-------------------+-------------------+-------------------+-------------------+
|CRASH_DATE|CRASH_TIME| BOROUGH|ZIP_CODE|LATITUDE|LONGITUDE|            LOCATION|      ON_STREET_NAME|CROSS_STREET_NAME|     OFF_STREET_NAME|NUMBER_OF_PERSONS_INJURED|NUMBER_OF_PERSONS_KILLED|NUMBER_OF_PEDESTRIANS_INJURED|NUMBER_OF_PEDESTRIANS_KILLED|NUMBER_OF_CYCLIST_INJURED|NUMBER_OF_CYCLIST_KILLED|NUMBER_OF_MOTORIST_INJURED|NUMBER_OF_MOTORIST_KILLED|CONTRIBUTING_FACTOR_VEHIC

In [79]:
df.printSchema()

root
 |-- CRASH_DATE: string (nullable = true)
 |-- CRASH_TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP_CODE: double (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON_STREET_NAME: string (nullable = true)
 |-- CROSS_STREET_NAME: string (nullable = true)
 |-- OFF_STREET_NAME: string (nullable = true)
 |-- NUMBER_OF_PERSONS_INJURED: double (nullable = true)
 |-- NUMBER_OF_PERSONS_KILLED: double (nullable = true)
 |-- NUMBER_OF_PEDESTRIANS_INJURED: long (nullable = true)
 |-- NUMBER_OF_PEDESTRIANS_KILLED: long (nullable = true)
 |-- NUMBER_OF_CYCLIST_INJURED: long (nullable = true)
 |-- NUMBER_OF_CYCLIST_KILLED: long (nullable = true)
 |-- NUMBER_OF_MOTORIST_INJURED: long (nullable = true)
 |-- NUMBER_OF_MOTORIST_KILLED: long (nullable = true)
 |-- CONTRIBUTING_FACTOR_VEHICLE_1: string (nullable = true)
 |-- CONTRIBUTING_FACTOR_VEHICLE_2: string (nullable = tru

### Na Check and count

In [80]:
Dict_Null = {col:df.filter(df[col].isNull()).count() for col in df.columns}

In [81]:
Dict_Null

{'BOROUGH': 377582,
 'COLLISION_ID': 0,
 'CONTRIBUTING_FACTOR_VEHICLE_1': 3645,
 'CONTRIBUTING_FACTOR_VEHICLE_2': 175068,
 'CONTRIBUTING_FACTOR_VEHICLE_3': 972302,
 'CONTRIBUTING_FACTOR_VEHICLE_4': 1031042,
 'CONTRIBUTING_FACTOR_VEHICLE_5': 1043709,
 'CRASH_DATE': 0,
 'CRASH_TIME': 0,
 'CROSS_STREET_NAME': 541455,
 'LATITUDE': 75849,
 'LOCATION': 75849,
 'LONGITUDE': 75849,
 'NUMBER_OF_CYCLIST_INJURED': 0,
 'NUMBER_OF_CYCLIST_KILLED': 0,
 'NUMBER_OF_MOTORIST_INJURED': 0,
 'NUMBER_OF_MOTORIST_KILLED': 0,
 'NUMBER_OF_PEDESTRIANS_INJURED': 0,
 'NUMBER_OF_PEDESTRIANS_KILLED': 0,
 'NUMBER_OF_PERSONS_INJURED': 17,
 'NUMBER_OF_PERSONS_KILLED': 30,
 'OFF_STREET_NAME': 794513,
 'ON_STREET_NAME': 255438,
 'VEHICLE_TYPE_CODE_1': 8721,
 'VEHICLE_TYPE_CODE_2': 243280,
 'VEHICLE_TYPE_CODE_3': 976623,
 'VEHICLE_TYPE_CODE_4': 1031949,
 'VEHICLE_TYPE_CODE_5': 1043919,
 'ZIP_CODE': 377763}

In [82]:
df.count()

1048575

In [84]:
Dict_Null_per = {col:(
    df.filter(
        df[col].isNull()
        ).count()/df.count() * 100
        ) for col in df.columns}

In [85]:
Dict_Null_per

{'BOROUGH': 36.009059914646066,
 'COLLISION_ID': 0.0,
 'CONTRIBUTING_FACTOR_VEHICLE_1': 0.34761461984121306,
 'CONTRIBUTING_FACTOR_VEHICLE_2': 16.695801444817967,
 'CONTRIBUTING_FACTOR_VEHICLE_3': 92.72603294947905,
 'CONTRIBUTING_FACTOR_VEHICLE_4': 98.32792122642634,
 'CONTRIBUTING_FACTOR_VEHICLE_5': 99.53594163507617,
 'CRASH_DATE': 0.0,
 'CRASH_TIME': 0.0,
 'CROSS_STREET_NAME': 51.637221944066944,
 'LATITUDE': 7.233531220942709,
 'LOCATION': 7.233531220942709,
 'LONGITUDE': 7.233531220942709,
 'NUMBER_OF_CYCLIST_INJURED': 0.0,
 'NUMBER_OF_CYCLIST_KILLED': 0.0,
 'NUMBER_OF_MOTORIST_INJURED': 0.0,
 'NUMBER_OF_MOTORIST_KILLED': 0.0,
 'NUMBER_OF_PEDESTRIANS_INJURED': 0.0,
 'NUMBER_OF_PEDESTRIANS_KILLED': 0.0,
 'NUMBER_OF_PERSONS_INJURED': 0.0016212478840330925,
 'NUMBER_OF_PERSONS_KILLED': 0.0028610256777054574,
 'OFF_STREET_NAME': 75.7707364756932,
 'ON_STREET_NAME': 24.360489235390887,
 'VEHICLE_TYPE_CODE_1': 0.8317001645089765,
 'VEHICLE_TYPE_CODE_2': 23.201010895739458,
 'VEHICLE_TY

### Convert the string to datetime string
- need to import to_date method

In [93]:
from pyspark.sql.functions import date_format, when, col, to_date

### Note
- if spark version > 3, use this setting 

spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [102]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [107]:
ndf = df.select(
    date_format(
        to_date(col('CRASH_DATE'), 'dd/mm/yyyyy'),'dd-mm-yyyy'
        ).alias('date'))

In [109]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

In [110]:
def splitUDF(row):
    if "/" in row:
        mm,dd,yyyy = row.split("/")
    elif "-" in row:
        yyyy,mm,dd = row.split("-")

    return [yyyy,mm,dd]

In [111]:
datSplitterUDF = udf(lambda row : splitUDF(row),ArrayType(StringType()))
df\
.select(datSplitterUDF(df.CRASH_DATE).alias("dt"))\
.withColumn('year',col('dt').getItem(0).cast('int'))\
.withColumn('month',col('dt').getItem(1).cast('int'))\
.withColumn('day',col('dt').getItem(2).cast('int'))\
.show()

+--------------+----+-----+---+
|            dt|year|month|day|
+--------------+----+-----+---+
|[2021, 04, 14]|2021|    4| 14|
|[2021, 04, 13]|2021|    4| 13|
|[2021, 04, 15]|2021|    4| 15|
|[2021, 04, 13]|2021|    4| 13|
|[2021, 04, 12]|2021|    4| 12|
|[2021, 04, 13]|2021|    4| 13|
|[2021, 04, 13]|2021|    4| 13|
|[2021, 04, 16]|2021|    4| 16|
|[2021, 04, 11]|2021|    4| 11|
|[2021, 04, 16]|2021|    4| 16|
|[2021, 04, 11]|2021|    4| 11|
|[2021, 04, 15]|2021|    4| 15|
|[2021, 04, 10]|2021|    4| 10|
|[2019, 05, 21]|2019|    5| 21|
|[2020, 01, 21]|2020|    1| 21|
|[2021, 02, 26]|2021|    2| 26|
|[2021, 03, 09]|2021|    3|  9|
|[2021, 03, 31]|2021|    3| 31|
|[2021, 04, 06]|2021|    4|  6|
|[2021, 04, 09]|2021|    4|  9|
+--------------+----+-----+---+
only showing top 20 rows



In [112]:
df.columns

['CRASH_DATE',
 'CRASH_TIME',
 'BOROUGH',
 'ZIP_CODE',
 'LATITUDE',
 'LONGITUDE',
 'LOCATION',
 'ON_STREET_NAME',
 'CROSS_STREET_NAME',
 'OFF_STREET_NAME',
 'NUMBER_OF_PERSONS_INJURED',
 'NUMBER_OF_PERSONS_KILLED',
 'NUMBER_OF_PEDESTRIANS_INJURED',
 'NUMBER_OF_PEDESTRIANS_KILLED',
 'NUMBER_OF_CYCLIST_INJURED',
 'NUMBER_OF_CYCLIST_KILLED',
 'NUMBER_OF_MOTORIST_INJURED',
 'NUMBER_OF_MOTORIST_KILLED',
 'CONTRIBUTING_FACTOR_VEHICLE_1',
 'CONTRIBUTING_FACTOR_VEHICLE_2',
 'CONTRIBUTING_FACTOR_VEHICLE_3',
 'CONTRIBUTING_FACTOR_VEHICLE_4',
 'CONTRIBUTING_FACTOR_VEHICLE_5',
 'COLLISION_ID',
 'VEHICLE_TYPE_CODE_1',
 'VEHICLE_TYPE_CODE_2',
 'VEHICLE_TYPE_CODE_3',
 'VEHICLE_TYPE_CODE_4',
 'VEHICLE_TYPE_CODE_5']

In [118]:
ndf = df\
.select('*', datSplitterUDF(df.CRASH_DATE).alias("dt"))\
.withColumn('year',col('dt').getItem(0).cast('int'))\
.withColumn('month',col('dt').getItem(1).cast('int'))\
.withColumn('day',col('dt').getItem(2).cast('int'))

In [119]:
ndf.show(5, truncate=False)

+----------+----------+--------+--------+--------+---------+---------------------+--------------------------------+-----------------+----------------------------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+-----------------------------------+-------------------+-------------------+-------------------+-------------------+--------------+----+-----+---+
|CRASH_DATE|CRASH_TIME|BOROUGH |ZIP_CODE|LATITUDE|LONGITUDE|LOCATION             |ON_STREET_NAME                  |CROSS_STREET_NAME|OFF_STREET_NAME                         |NUMBER_OF_PERSONS_INJURED|NUMBER_OF_PERSONS_KILLED|NUMBER_OF_PEDESTRIANS_INJURED|NUMBER_OF_PEDESTRIANS_KILLED|NUMBER_OF_CYCLIST_

In [131]:
drop_list = [
             'CRASH_DATE',
             'BOROUGH',
             'ZIP_CODE',
             'LOCATION',
             'ON_STREET_NAME',
             'CROSS_STREET_NAME',
             'OFF_STREET_NAME',
             'CONTRIBUTING_FACTOR_VEHICLE_1',
             'CONTRIBUTING_FACTOR_VEHICLE_2',
             'CONTRIBUTING_FACTOR_VEHICLE_3',
             'CONTRIBUTING_FACTOR_VEHICLE_4',
             'CONTRIBUTING_FACTOR_VEHICLE_5',
             'COLLISION_ID',
             'VEHICLE_TYPE_CODE_1',
             'VEHICLE_TYPE_CODE_2',
             'VEHICLE_TYPE_CODE_3',
             'VEHICLE_TYPE_CODE_4',
             'VEHICLE_TYPE_CODE_5',
             'dt'
             ]

In [132]:
slice_df = ndf.drop(*drop_list)

In [133]:
slice_df.show(5, truncate=False)

+----------+--------+---------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+----+-----+---+
|CRASH_TIME|LATITUDE|LONGITUDE|NUMBER_OF_PERSONS_INJURED|NUMBER_OF_PERSONS_KILLED|NUMBER_OF_PEDESTRIANS_INJURED|NUMBER_OF_PEDESTRIANS_KILLED|NUMBER_OF_CYCLIST_INJURED|NUMBER_OF_CYCLIST_KILLED|NUMBER_OF_MOTORIST_INJURED|NUMBER_OF_MOTORIST_KILLED|year|month|day|
+----------+--------+---------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+----+-----+---+
|5:32      |null    |null     |0.0                      |0.0                     |0                            |0                           |0                        |0                       |0                        

In [139]:
slice_df.count()

1048575

In [137]:
rm_na_loc_null = slice_df.filter(
    (df['LATITUDE'].isNotNull())
    &(df['LONGITUDE'].isNotNull()))

In [138]:
rm_na_loc_null.count()

972726

In [140]:
rm_na_loc_null.show(5, truncate=False)

+----------+---------+----------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+----+-----+---+
|CRASH_TIME|LATITUDE |LONGITUDE |NUMBER_OF_PERSONS_INJURED|NUMBER_OF_PERSONS_KILLED|NUMBER_OF_PEDESTRIANS_INJURED|NUMBER_OF_PEDESTRIANS_KILLED|NUMBER_OF_CYCLIST_INJURED|NUMBER_OF_CYCLIST_KILLED|NUMBER_OF_MOTORIST_INJURED|NUMBER_OF_MOTORIST_KILLED|year|month|day|
+----------+---------+----------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+----+-----+---+
|21:35     |40.68358 |-73.97617 |1.0                      |0.0                     |1                            |0                           |0                        |0                       |0                

In [141]:

rename_df = rm_na_loc_null\
.withColumnRenamed("CRASH_TIME", "time")\
.withColumnRenamed("LATITUDE", "lat")\
.withColumnRenamed("LONGITUDE", "lon")\
.withColumnRenamed("NUMBER_OF_PERSONS_INJURED", "npi")\
.withColumnRenamed("NUMBER_OF_PERSONS_KILLED", "npk")\
.withColumnRenamed("NUMBER_OF_PEDESTRIANS_INJURED", "npedi")\
.withColumnRenamed("NUMBER_OF_PEDESTRIANS_KILLED", "npedk")\
.withColumnRenamed("NUMBER_OF_CYCLIST_INJURED", "nci")\
.withColumnRenamed("NUMBER_OF_CYCLIST_KILLED", "nck")\
.withColumnRenamed("NUMBER_OF_MOTORIST_INJURED", "nmi")\
.withColumnRenamed("NUMBER_OF_MOTORIST_KILLED", "nmk")


In [143]:
rename_df.show(5, truncate=False)

+-----+---------+----------+---+---+-----+-----+---+---+---+---+----+-----+---+
|time |lat      |lon       |npi|npk|npedi|npedk|nci|nck|nmi|nmk|year|month|day|
+-----+---------+----------+---+---+-----+-----+---+---+---+---+----+-----+---+
|21:35|40.68358 |-73.97617 |1.0|0.0|1    |0    |0  |0  |0  |0  |2021|4    |13 |
|8:25 |0.0      |0.0       |0.0|0.0|0    |0    |0  |0  |0  |0  |2021|4    |12 |
|22:50|40.69754 |-73.98312 |0.0|0.0|0    |0    |0  |0  |0  |0  |2019|5    |21 |
|14:50|40.843464|-73.836   |0.0|0.0|0    |0    |0  |0  |0  |0  |2021|2    |26 |
|11:00|40.692547|-73.990974|1.0|0.0|0    |0    |0  |0  |1  |0  |2021|3    |9  |
+-----+---------+----------+---+---+-----+-----+---+---+---+---+----+-----+---+
only showing top 5 rows



In [144]:
rename_df.columns

['time',
 'lat',
 'lon',
 'npi',
 'npk',
 'npedi',
 'npedk',
 'nci',
 'nck',
 'nmi',
 'nmk',
 'year',
 'month',
 'day']

In [146]:
reorder_df = rename_df.select(
    "year", "month", "day",
    "time", "lat", "lon",
    "npi", "npk", "npedi",
    "npedk", "nci", "nck",
    "nmi", "nmk"
    )

In [149]:
reorder_df.show(5, truncate=False)

+----+-----+---+-----+---------+----------+---+---+-----+-----+---+---+---+---+
|year|month|day|time |lat      |lon       |npi|npk|npedi|npedk|nci|nck|nmi|nmk|
+----+-----+---+-----+---------+----------+---+---+-----+-----+---+---+---+---+
|2021|4    |13 |21:35|40.68358 |-73.97617 |1.0|0.0|1    |0    |0  |0  |0  |0  |
|2021|4    |12 |8:25 |0.0      |0.0       |0.0|0.0|0    |0    |0  |0  |0  |0  |
|2019|5    |21 |22:50|40.69754 |-73.98312 |0.0|0.0|0    |0    |0  |0  |0  |0  |
|2021|2    |26 |14:50|40.843464|-73.836   |0.0|0.0|0    |0    |0  |0  |0  |0  |
|2021|3    |9  |11:00|40.692547|-73.990974|1.0|0.0|0    |0    |0  |0  |1  |0  |
+----+-----+---+-----+---------+----------+---+---+-----+-----+---+---+---+---+
only showing top 5 rows



In [150]:
reorder_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- npi: double (nullable = true)
 |-- npk: double (nullable = true)
 |-- npedi: long (nullable = true)
 |-- npedk: long (nullable = true)
 |-- nci: long (nullable = true)
 |-- nck: long (nullable = true)
 |-- nmi: long (nullable = true)
 |-- nmk: long (nullable = true)



### get only 2020 data set

In [151]:
df_20 = reorder_df.filter(col('year') == 2020)

In [152]:
df_20.count()

103989

In [155]:
df_20.select(
    "npi", "npk", "npedi",
    "npedk", "nci", "nck",
    "nmi", "nmk"
).describe().show()

+-------+------------------+--------------------+-------------------+--------------------+-------------------+--------------------+------------------+--------------------+
|summary|               npi|                 npk|              npedi|               npedk|                nci|                 nck|               nmi|                 nmk|
+-------+------------------+--------------------+-------------------+--------------------+-------------------+--------------------+------------------+--------------------+
|  count|            103989|              103989|             103989|              103989|             103989|              103989|            103989|              103989|
|   mean|0.3909355797247786|0.002288703612882...|0.06161228591485638|9.039417630710941E-4|0.05073613555279886|2.307936416351729...|0.2785871582571233|0.001153968208175...|
| stddev|0.7416904881029619|0.051280417306981005| 0.2520113741506671|0.030370484449129483| 0.2243563481228609|0.015190213785317286|0.7065035