In [6]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.1.153:7077") \
        .appName("hadoop_example")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [7]:
# Load the CSV file from HDFS 
DataFrame = spark_session.read.format("csv").option("header", "true").load("hdfs://192.168.1.153:9000/parking-citations.csv")

In [8]:
# Call show() to verify the data is loaded correctly
DataFrame.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|         

In [9]:
# Print the schema for the DataFrame
DataFrame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [10]:
# Count the number of rows in the CSV file
DataFrame.count()

9257460

In [11]:
# Count the number of partitions in the underlying RDD
DataFrame.rdd.getNumPartitions()

12

In [12]:
# Drop the columns VIN, Latitude and Longitude
DataFrame_d = DataFrame.drop("Latitude", "Longitude", "VIN")
DataFrame_d.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)



In [13]:
DataFrame_d.select("Fine amount").show()

+-----------+
|Fine amount|
+-----------+
|         50|
|         50|
|         58|
|       null|
|         93|
|         50|
|        163|
|        163|
|         93|
|         93|
|         93|
|         93|
|         50|
|         93|
|         68|
|         68|
|         68|
|         50|
|         93|
|         73|
+-----------+
only showing top 20 rows



In [14]:
# Converting to float
from pyspark.sql.types import FloatType
changedTypedf = DataFrame_d.withColumn("Fine amount", DataFrame_d["Fine amount"].cast(FloatType()))
changedTypedf.select("Fine amount").show()

+-----------+
|Fine amount|
+-----------+
|       50.0|
|       50.0|
|       58.0|
|       null|
|       93.0|
|       50.0|
|      163.0|
|      163.0|
|       93.0|
|       93.0|
|       93.0|
|       93.0|
|       50.0|
|       93.0|
|       68.0|
|       68.0|
|       68.0|
|       50.0|
|       93.0|
|       73.0|
+-----------+
only showing top 20 rows



In [15]:
# The maximum fine amount
DataFrame_dmax = changedTypedf.agg({"Fine amount" : "max"}).collect()
print(DataFrame_dmax)

[Row(max(Fine amount)=505.0)]


In [16]:
# Count of fines have this amount
import pyspark.sql.functions as f
changedTypedf.groupBy(changedTypedf.columns)\
             .count()\
             .where(f.col('Fine amount') == 505.0)\
             .select(f.sum('count'))\
             .show()

+----------+
|sum(count)|
+----------+
|         6|
+----------+



In [17]:
changedTypedf.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|       50.0|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|            CA|           201512| GMC|        VN|   WH|       525 S MAIN ST| 1C51|     1|        4000A1|   NO

In [18]:
# Show the top 20 most frequent vehicle makes, and their frequencies
changedTypedf.groupBy('Make')\
             .count()\
             .sort('count', ascending=False)\
             .show()

+----+-------+
|Make|  count|
+----+-------+
|TOYT|1531949|
|HOND|1043276|
|FORD| 807498|
|NISS| 662097|
|CHEV| 631413|
| BMW| 422916|
|MERZ| 376830|
|VOLK| 316002|
|HYUN| 285286|
|DODG| 271590|
|LEXS| 263269|
| KIA| 217795|
|JEEP| 214965|
|AUDI| 179718|
|MAZD| 169811|
|OTHR| 154376|
| GMC| 132788|
|INFI| 120340|
|CHRY| 120317|
|ACUR| 111265|
+----+-------+
only showing top 20 rows



In [19]:
changedTypedf.select("Color").show()

+-----+
|Color|
+-----+
|   GY|
|   WH|
|   BK|
|   WH|
|   BK|
|   GY|
|   BL|
|   BK|
|   BR|
|   SI|
|   WH|
|   GO|
|   BK|
|   BK|
|   BK|
|   BK|
|   WH|
| null|
|   BK|
|   BK|
+-----+
only showing top 20 rows



In [20]:
# Creating a new column, 'color long' and mapping the colors to their corresponding values
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def color_long(colors):
    
    COLORS = {'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 
              'BK':'Black', 'BL':'Blue', 'BN':'Brown', 
              'BR':'Brown', 'BZ':'Bronze', 'CH':'Charcoal', 
              'DK':'Dark', 'GD':'Gold', 'GO':'Gold', 
              'GN':'Green', 'GY':'Gray', 'GT':'Granite', 
              'IV':'Ivory', 'LT':'Light', 'OL':'Olive', 
              'OR':'Orange', 'MR':'Maroon', 'PK':'Pink',
              'RD':'Red', 'RE':'Red', 'SI':'Silver', 
              'SL':'Silver', 'SM':'Smoke', 'TN':'Tan', 
              'VT':'Violet', 'WT':'White', 'WH':'White', 
              'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'}
    
    if colors in COLORS:
        return COLORS[colors]
    else:
        return colors
# User defined function of type string    
udf_color_long = udf(color_long, StringType())
changedTypedf_cl = changedTypedf.withColumn('Color long', udf_color_long('Color'))
changedTypedf_cl.select('Color','Color long').show()

+-----+----------+
|Color|Color long|
+-----+----------+
|   GY|      Gray|
|   WH|     White|
|   BK|     Black|
|   WH|     White|
|   BK|     Black|
|   GY|      Gray|
|   BL|      Blue|
|   BK|     Black|
|   BR|     Brown|
|   SI|    Silver|
|   WH|     White|
|   GO|      Gold|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   WH|     White|
| null|      null|
|   BK|     Black|
|   BK|     Black|
+-----+----------+
only showing top 20 rows



In [21]:
# The most frequent colour value for Toyotas
toyt = changedTypedf_cl.filter(changedTypedf_cl['Make'] == 'TOYT')
toyt_count = toyt.groupBy('Color long').count()
toyt_count.orderBy('count', ascending = False).show(1)

+----------+------+
|Color long| count|
+----------+------+
|      Gray|346822|
+----------+------+
only showing top 1 row



In [None]:
spark_session.stop()