In [1]:
from pyspark.sql import SparkSession
from operator import add

spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.250:7077") \
        .appName("AlmaLundbergSparkapplicationA3_B")\
        .config("spark.dynamicAllocation.enabled", False)\
        .config("spark.cores.max", 4)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# RDD API 
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/06 22:23:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Section B - Working with DataFrames and SQL

dataframe = spark_session.read.csv("hdfs://192.168.2.250:9000/parking-citations.csv", header = True, inferSchema=True)
dataframe.show()

                                                                                

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21 00:00:00|    1251.0|    NULL|       NULL|            CA|         200304.0|NULL|HOND|        PA|   GY|

In [3]:
dataframe.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Issue time: double (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: double (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: double (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: double (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [4]:
# Row count of CSV file
print(f"Number of rows: {dataframe.count()}")



Number of rows: 13077724


                                                                                

In [5]:
# Number of partitions in the underlying RDD
print(f"Number of partitions: {dataframe.rdd.getNumPartitions()}")

Number of partitions: 16


In [6]:
# Drop the columns VIN, Latitude, and Longitude
dataframe = dataframe.drop("VIN", "Latitude", "Longitude")

In [7]:
from pyspark.sql.functions import col

# Add new column from Fine amount as float
dataframe = dataframe.withColumn("FineAmountFloat", col("Fine amount").cast("float"))

# Show the DataFrame to verify the new column
dataframe.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+------------------+-----------------+----------------------+---------------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount|Agency Description|Color Description|Body Style Description|FineAmountFloat|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+------------------+-----------------+----------------------+---------------+
|   1103341116|2015-12-21 00:00:00|    1251.0|    NULL|       NULL|            CA|         200304.0|HOND|        PA|   GY|     13147 WELBY WAY|01521|   1.

In [8]:
# Check that the new column was added
dataframe.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Issue time: double (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: double (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: double (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: double (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: double (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)
 |-- FineAmountFloat: float (nullable = true)



In [9]:
# Find the maximum fine amount
max_fine = dataframe.agg({"FineAmountFloat": "max"}).collect()[0][0]

# Print the maximum fine amount
print(f"Maximum fine amount: {max_fine}")

                                                                                

Maximum fine amount: 1100.0


In [10]:
# Count how many fines have this maximum amount
max_fine_count = dataframe.filter(col("FineAmountFloat") == max_fine).count()

print(f"Count of maximum fines: {max_fine_count}")



Count of maximum fines: 626


                                                                                

In [11]:
from pyspark.sql import functions as pyfuncs

# Show the top 20 most frequent vehicle makes, and their frequencies
vehicle_makes = dataframe.groupBy("Make").count()

sorted_makes = vehicle_makes.orderBy("Count", ascending=False).show(20)



+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+
only showing top 20 rows



                                                                                

In [12]:
# Expand abbreviations in the color column using a User Defined Function
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a dictionary mapping color abbreviations to their full names
COLORS = {
    'AL': 'Aluminum', 'AM': 'Amber', 'BG': 'Beige', 'BK': 'Black',
    'BL': 'Blue', 'BN': 'Brown', 'BR': 'Brown', 'BZ': 'Bronze',
    'CH': 'Charcoal', 'DK': 'Dark', 'GD': 'Gold', 'GO': 'Gold',
    'GN': 'Green', 'GY': 'Gray', 'GT': 'Granite', 'IV': 'Ivory',
    'LT': 'Light', 'OL': 'Olive', 'OR': 'Orange', 'MR': 'Maroon',
    'PK': 'Pink', 'RD': 'Red', 'RE': 'Red', 'SI': 'Silver', 'SL': 'Silver',
    'SM': 'Smoke', 'TN': 'Tan', 'VT': 'Violet', 'WT': 'White', 'WH': 'White',
    'YL': 'Yellow', 'YE': 'Yellow', 'UN': 'Unknown'
}

# The function takes a color abbreviation and returns the full color name using the dictionary defined above
# if there is no key matching the original color, the original color is returned
def expand_color_abbreviation(color_abbr):
    return COLORS.get(color_abbr, color_abbr)

# chekc the udf with Spark, specifying the return type as StringType
expand_color_udf = udf(expand_color_abbreviation, StringType())

# apply the udf to the Color column of the DataFrame, creates a new column ColorLong with expanded color names
dataframe = dataframe.withColumn("ColorLong", expand_color_udf(dataframe["Color"]))

In [13]:
# Find the most frequent color value for Toyotas (TOYT)
dataframe.filter(dataframe["Make"] == "TOYT").groupBy("ColorLong").count().orderBy("count", ascending=False).show(1)



+---------+------+
|ColorLong| count|
+---------+------+
|     Gray|489697|
+---------+------+
only showing top 1 row



                                                                                