In [1]:
# comment out after initial run

# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
# !tar -xvf spark-3.1.2-bin-hadoop2.7.tgz
# !pip install -q findspark
# !pip install pyspark
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [2]:
import pandas as pd 
import numpy as np
import pyspark
from pyspark.sql import SparkSession 
from pyspark.sql.functions import *  
import findspark
findspark.init() 

In [3]:
spark = SparkSession.builder \
    .master("local") \
    .appName("JB_APP") \
    .getOrCreate()

In [4]:
spark.getActiveSession()

In [5]:
Parquet_DF = spark.read.parquet("/content/drive/MyDrive/df_pandas.parquet.gzip")

In [6]:
Parquet_DF.show()

+-------------+-------------------+----------+--------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+---------+----------+---------+---------+---------+---------+---------+---------+-----------------+----------+
|Ticket_number|         Issue_Date|Issue_time|Meter_Id|RP_State_Plate|Plate_Expiry_Date|Make|Body_Style|Color|            Location|Route|Agency|Violation_code|Violation_Description|Fine_amount| Latitude|Longitude|Distance_to_pointA|address_0| address_1|address_2|address_3|address_4|address_5|address_6|address_7|__index_level_0__|Issue_year|
+-------------+-------------------+----------+--------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+---------+----------+---------+---------+---------+---------+---------+---------+-

In [7]:
# print size of the dataframe 

print((Parquet_DF.count(), len(Parquet_DF.columns)))

(748312, 28)


##### 2. Count number distinct values for each column

In [8]:
# first time running this cell doesn't work, but will work after restarting and running all.

for x in Parquet_DF.columns:
  Parquet_DF.select(countDistinct(x)).show()

+-----------------------------+
|count(DISTINCT Ticket_number)|
+-----------------------------+
|                       748312|
+-----------------------------+

+--------------------------+
|count(DISTINCT Issue_Date)|
+--------------------------+
|                       359|
+--------------------------+

+--------------------------+
|count(DISTINCT Issue_time)|
+--------------------------+
|                      1440|
+--------------------------+

+------------------------+
|count(DISTINCT Meter_Id)|
+------------------------+
|                   28660|
+------------------------+

+------------------------------+
|count(DISTINCT RP_State_Plate)|
+------------------------------+
|                            50|
+------------------------------+

+---------------------------------+
|count(DISTINCT Plate_Expiry_Date)|
+---------------------------------+
|                              344|
+---------------------------------+

+--------------------+
|count(DISTINCT Make)|
+-----------------

##### 3. Count number of null in every column. Feel free to change the data types if needed.

In [9]:
# checking data types
Parquet_DF.dtypes

[('Ticket_number', 'string'),
 ('Issue_Date', 'timestamp'),
 ('Issue_time', 'double'),
 ('Meter_Id', 'string'),
 ('RP_State_Plate', 'string'),
 ('Plate_Expiry_Date', 'double'),
 ('Make', 'string'),
 ('Body_Style', 'string'),
 ('Color', 'string'),
 ('Location', 'string'),
 ('Route', 'string'),
 ('Agency', 'double'),
 ('Violation_code', 'string'),
 ('Violation_Description', 'string'),
 ('Fine_amount', 'double'),
 ('Latitude', 'double'),
 ('Longitude', 'double'),
 ('Distance_to_pointA', 'double'),
 ('address_0', 'string'),
 ('address_1', 'string'),
 ('address_2', 'string'),
 ('address_3', 'string'),
 ('address_4', 'string'),
 ('address_5', 'string'),
 ('address_6', 'string'),
 ('address_7', 'string'),
 ('__index_level_0__', 'bigint'),
 ('Issue_year', 'int')]

In [10]:
# convert time stamp to double

from pyspark.sql.types import DoubleType
Parquet_DF = Parquet_DF.withColumn('Issue_Date', col('Issue_Date').cast('Double'))


In [11]:
# show count of null values for each column in df

Parquet_DF.select([count(when(col(c).isNull(), c)).alias(c) for c in Parquet_DF.columns]).show()


+-------------+----------+----------+--------+--------------+-----------------+----+----------+-----+--------+-----+------+--------------+---------------------+-----------+--------+---------+------------------+---------+---------+---------+---------+---------+---------+---------+---------+-----------------+----------+
|Ticket_number|Issue_Date|Issue_time|Meter_Id|RP_State_Plate|Plate_Expiry_Date|Make|Body_Style|Color|Location|Route|Agency|Violation_code|Violation_Description|Fine_amount|Latitude|Longitude|Distance_to_pointA|address_0|address_1|address_2|address_3|address_4|address_5|address_6|address_7|__index_level_0__|Issue_year|
+-------------+----------+----------+--------+--------------+-----------------+----+----------+-----+--------+-----+------+--------------+---------------------+-----------+--------+---------+------------------+---------+---------+---------+---------+---------+---------+---------+---------+-----------------+----------+
|            0|         0|       136|  5

In [12]:
# check df shape

print((Parquet_DF.count(), len(Parquet_DF.columns)))

(748312, 28)


##### 4. Write a function to drop columns with more than 1% missing values

In [13]:
percent = 0.01
def drop_null_columns(df, percent):
  null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
  dropped_columns = [k for k, v in null_counts.items() if v > percent]
  df = df.drop(*dropped_columns)
  return df, dropped_columns

Parquet_DF, dropped_columns = drop_null_columns(Parquet_DF, percent)

In [14]:
dropped_columns

['Issue_time',
 'Meter_Id',
 'Plate_Expiry_Date',
 'Make',
 'Body_Style',
 'Color',
 'Route',
 'Violation_Description',
 'address_1',
 'address_2',
 'address_3',
 'address_4',
 'address_5',
 'address_6',
 'address_7']

In [15]:
# check df shape
print((Parquet_DF.count(), len(Parquet_DF.columns)))

(748312, 13)


##### 5. Impute missing values for "Fine_amount" column with "median" using "Imputer" package from Pyspark

In [16]:
# Check missing values in fine_amount column

Parquet_DF.select([count(when(isnan('Fine_amount'),True))]).show()

+-------------------------------------------------+
|count(CASE WHEN isnan(Fine_amount) THEN true END)|
+-------------------------------------------------+
|                                                0|
+-------------------------------------------------+



In [17]:
# no missing values - no need to impute

##### 6.  Drop "rows" with at least two null values

In [18]:
# drop rows with at least 2 NA values 
Parquet_DF = Parquet_DF.na.drop(thresh=2)

In [19]:
# no change in df shape(?)
print((Parquet_DF.count(), len(Parquet_DF.columns)))

(748312, 13)


##### 7. Extract day and month from Issue_Date column and then drop the "Issue_Date", and "__index_level_0__" column (if exist)

In [20]:
# Check issue_date column type
dict(Parquet_DF.dtypes)['Issue_Date']

'double'

In [21]:
# Inspect issue_date column
Parquet_DF.select('Issue_Date').show()

+-----------+
| Issue_Date|
+-----------+
|1.4518656E9|
| 1.452384E9|
|1.4522976E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
|1.4516064E9|
+-----------+
only showing top 20 rows



In [22]:
# convert to timestamp type column

Parquet_DF = Parquet_DF.withColumn("Issue_Date", col("Issue_Date").cast("timestamp"))

In [23]:
# confirm conversion

dict(Parquet_DF.dtypes)['Issue_Date']

'timestamp'

In [24]:
# Create Month Column
Parquet_DF = Parquet_DF.withColumn('Month',month(Parquet_DF.Issue_Date))


In [25]:
# Create Day Column
Parquet_DF = Parquet_DF.withColumn('Day', dayofmonth(Parquet_DF.Issue_Date))

In [26]:
# check for issue_date and index_level_0 columns
Parquet_DF.columns

['Ticket_number',
 'Issue_Date',
 'RP_State_Plate',
 'Location',
 'Agency',
 'Violation_code',
 'Fine_amount',
 'Latitude',
 'Longitude',
 'Distance_to_pointA',
 'address_0',
 '__index_level_0__',
 'Issue_year',
 'Month',
 'Day']

In [27]:
# Drop Issue Date and Index_level_0 columns

Parquet_DF = Parquet_DF.drop("Issue_Date","__index_level_0__")

In [28]:
# confirm columns have been dropped
Parquet_DF.columns

['Ticket_number',
 'RP_State_Plate',
 'Location',
 'Agency',
 'Violation_code',
 'Fine_amount',
 'Latitude',
 'Longitude',
 'Distance_to_pointA',
 'address_0',
 'Issue_year',
 'Month',
 'Day']

### Read the questions below and Export three reports based on them:

##### 9. First Report:
Create a report table to get the Sum of "Fine_Amouns" for each State Plates ("RP_State_Plate" column) for each selected vehicle make company ("Make" column). 
The selected brands are: 

Make_Cars = ["ACUR", "AUDI", "DODG", "BMW", "BUIC", "CHEV", "CHRY", "NISS"
             ,"FIAT", "FORD", "GMC", "JEEP", "KIA", "VOLV", "TOYT", "SUBA", "MAZD"]

The final report should look like below:

![report_table.PNG](attachment:report_table.PNG)

In [29]:
Make_Cars = ["ACUR", "AUDI", "DODG", "BMW", "BUIC", "CHEV", "CHRY", "NISS"
             ,"FIAT", "FORD", "GMC", "JEEP", "KIA", "VOLV", "TOYT", "SUBA", "MAZD"]

In [30]:
# create new dataframe for report 

report1_df = spark.read.parquet("/content/drive/MyDrive/df_pandas.parquet.gzip")

In [31]:
#groupby state plates and pivot on the make of the car

report1_df = report1_df.groupBy('RP_State_Plate').pivot('Make', Make_Cars).sum('Fine_amount')

In [32]:
report1_df.show()

+--------------+--------+--------+------------------+---------+---------+------------------+--------+----------+--------+------------------+-----------------+---------+---------+------------------+----------+--------+--------+
|RP_State_Plate|    ACUR|    AUDI|              DODG|      BMW|     BUIC|              CHEV|    CHRY|      NISS|    FIAT|              FORD|              GMC|     JEEP|      KIA|              VOLV|      TOYT|    SUBA|    MAZD|
+--------------+--------+--------+------------------+---------+---------+------------------+--------+----------+--------+------------------+-----------------+---------+---------+------------------+----------+--------+--------+
|            AZ|  2861.0|  6198.0|           19550.0|  11594.0|   2927.0|           34152.0| 11771.0|   45855.0|  2155.0|          81257.68|         43955.67|  16560.0|  15309.0|           13916.0|   52397.0|  4341.0|  7235.0|
|            SC|   370.0|    63.0|             632.0|   1629.0|    257.0|            1506.0|

In [33]:
# export the report a CSV with header and "|" deliminer. Name it as Report_Make_State_Fine.

report1_df.coalesce(1).write.option("delimiter","|").option("header",True).csv("/content/drive/MyDrive/Colab Notebooks/JesusBaquiax_DE2/Report_Make_State_Fine")

##### 10. Second Report:
Filter the dataframe where Make = HOND and get a count of total violation descriptions per category in 2016

In [34]:
report2_df = spark.read.parquet("/content/drive/MyDrive/df_pandas.parquet.gzip")

In [35]:
report2_filtered = report2_df.filter( (report2_df.Make  == "HOND") & (report2_df.Issue_year  == 2016)) 

In [36]:
(report2_filtered.groupby('Violation_Description').count()
        .orderBy('count', ascending=False).show())

+---------------------+-----+
|Violation_Description|count|
+---------------------+-----+
| NO PARK/STREET CLEAN|19875|
|           METER EXP.|15332|
| PREFERENTIAL PARKING| 5511|
|      DISPLAY OF TABS| 3480|
|             RED ZONE| 3415|
| PARKED OVER TIME LIM| 3373|
|           NO PARKING| 3259|
|           WHITE ZONE| 2258|
|     NO STOP/STANDING| 1445|
|    DISPLAY OF PLATES| 1302|
|    BLOCKING DRIVEWAY|  842|
|  STOP/STAND PROHIBIT|  830|
|     STANDNG IN ALLEY|  827|
|        NO STOP/STAND|  804|
| NO STOPPING/ANTI-GRI|  772|
|          YELLOW ZONE|  688|
|  OUTSIDE LINES/METER|  638|
|         FIRE HYDRANT|  595|
| OFF STR/OVERTIME/MTR|  492|
|   PARKED ON SIDEWALK|  421|
+---------------------+-----+
only showing top 20 rows



In [37]:
report2_filtered.coalesce(1).write.option("delimiter","|").option("header",True).csv("/content/drive/MyDrive/Colab Notebooks/JesusBaquiax_DE2/Report_HOND_2016")

##### 11. Third Report:
Create a new dataframe from the original dataset, drop everything before 2016 date, and only use the below columns:

keep_columns=['Ticket_number', 'RP_State_Plate', 'Make', 'Body_Style', 'Fine_amount']

In [38]:
report3_df = spark.read.parquet("/content/drive/MyDrive/df_pandas.parquet.gzip")

In [39]:
# filter by Issue_Yea column

report3_filtered = report3_df.filter(report3_df.Issue_year < 2016)

In [40]:
# keep specific columns

report3_filtered = report3_filtered.select(col('Ticket_number'), col('RP_State_PLate'), col('Make'), col('Body_Style'), col('Fine_amount'))

In [41]:
report3_filtered.coalesce(1).write.option("delimiter","|").option("header",True).csv("/content/drive/MyDrive/Colab Notebooks/JesusBaquiax_DE2/Report_Parking_Fines_2016")