In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import StringType
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import col, max
#from pyspark.sql.functions import max
from pyspark.sql.functions import udf
#from pyspark.sql.functions import col


spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.250:7077") \
        .appName("MitraRokni_Section_B")\
        .config("spark.dynamicAllocation.enabled", False)\
        .config("spark.cores.max", 4)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .config("spark.cores.min", 1)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()
# Old API (RDD)
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/21 22:05:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# 1 loead the dataset
data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.250:9000/parking-citations.csv')\
    .cache()

                                                                                

In [3]:
df= data_frame

In [4]:
# B.1 call show
data_frame.show()



+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    NULL|       NULL|            CA|           200304|NULL|HOND|        PA|  

                                                                                

In [5]:
# This part was not required in assignment!
# Assuming 'df' is your PySpark DataFrame
column_names = data_frame.columns
print(column_names)


['Ticket number', 'Issue Date', 'Issue time', 'Meter Id', 'Marked Time', 'RP State Plate', 'Plate Expiry Date', 'VIN', 'Make', 'Body Style', 'Color', 'Location', 'Route', 'Agency', 'Violation code', 'Violation Description', 'Fine amount', 'Latitude', 'Longitude', 'Agency Description', 'Color Description', 'Body Style Description']


In [6]:
# B.2 the schema:
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [7]:
# B.3 Count the number of rows 
data_frame.count()

                                                                                

13077724

In [8]:
# B.4 the numbers of partitions: 
data_frame.rdd.getNumPartitions()

16

In [9]:
# B.5 Drop the columns VIN, Latitude and Longitude
columns_to_drop = ['VIN', 'Latitude','Longitude']
data_frame = data_frame.drop(*columns_to_drop)
data_frame.show()

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    NULL|       NULL|            CA|           200304|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|     

In [10]:
# B.6 Find the maximum fine amount: 
# I am changing the name of column 'Fine amount'  to 'FineAmount' because of the spece between the words 
data_frame= data_frame.withColumnRenamed('Fine amount', 'FineAmount')
column_names2 = data_frame.columns
print(column_names2)

# Counting null values in FineAmount
#null_count = data_frame.filter(col("FineAmount").isNull()).count() # Counting null values in FineAmount
#print(f"Number of null values in 'FineAmount': {null_count}") # Prininting the numbre of null inputs

# Convert 'fine amount' to float
data_frame2= data_frame
data_frame = data_frame.withColumn("FineAmount", col("FineAmount").cast("float"))

print(">>>")
# Perform the aggregation and collect the result to get the maximum amount of fine
max_fine_result = data_frame.agg({"FineAmount": "max"}).collect()
max_Fine_amount = max_fine_result[0]["max(FineAmount)"]
print("The maximum amount of fine is:", max_Fine_amount)




['Ticket number', 'Issue Date', 'Issue time', 'Meter Id', 'Marked Time', 'RP State Plate', 'Plate Expiry Date', 'Make', 'Body Style', 'Color', 'Location', 'Route', 'Agency', 'Violation code', 'Violation Description', 'FineAmount', 'Agency Description', 'Color Description', 'Body Style Description']
>>>




The maximum amount of fine is: 1100.0


                                                                                

In [11]:
# B.6 Second approach refrence help from AI

data_frame2 = data_frame2.withColumn("FineAmount", col("FineAmount").cast("float"))
# Find the maximum fine amount
max_fine_amount_result = data_frame2.agg(max("FineAmount").alias("MaxFineAmount")).collect()
max_fine_amount = max_fine_amount_result[0]["MaxFineAmount"]

# Count how many fines have the maximum fine amount
count_max_fine = data_frame2.filter(col("FineAmount") == max_fine_amount).count()

print(f"Maximum Fine Amount: {max_fine_amount}")
print(f"Count of Fines with Maximum Amount: {count_max_fine}")




Maximum Fine Amount: 1100.0
Count of Fines with Maximum Amount: 626


                                                                                

In [12]:
# B.7 Show the top 20 most frequent vehicle makes, and their frequencies.
top_Freq_Vehicle = data_frame.groupby('Make').count() # Counting vehicles by groupby
top_Freq_Vehicle.orderBy(("count"), ascending=False).show()  # Showing in Decrising Order



+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+
only showing top 20 rows



                                                                                

In [13]:
# B.8
# Define the COLORS mapping dictionary
COLORS = {
    'AL': 'Aluminum', 'AM': 'Amber', 'BG': 'Beige', 'BK': 'Black', 'BL': 'Blue', 'BN': 'Brown', 'BR': 'Brown', 'BZ': 'Bronze',
    'CH': 'Charcoal', 'DK': 'Dark', 'GD': 'Gold', 'GO': 'Gold', 'GN': 'Green', 'GY': 'Gray', 'GT': 'Granite', 'IV': 'Ivory',
    'LT': 'Light', 'OL': 'Olive', 'OR': 'Orange', 'MR': 'Maroon', 'PK': 'Pink', 'RD': 'Red', 'RE': 'Red', 'SI': 'Silver',
    'SL': 'Silver', 'SM': 'Smoke', 'TN': 'Tan', 'VT': 'Violet', 'WT': 'White', 'WH': 'White', 'YL': 'Yellow', 'YE': 'Yellow',
    'UN': 'Unknown'
}

# Define a function for mapping the colors
def color_mapping(color):
    if color in COLORS:
        return COLORS[color]
    return color

udf_Mapping_Colors = udf(color_mapping, StringType())
data_frame_FullAbbre = data_frame.withColumn("Long_color", udf_Mapping_Colors("Color"))
data_frame_FullAbbre.select("Color", "Long_color").show(truncate=False)


[Stage 19:>                                                         (0 + 1) / 1]

+-----+----------+
|Color|Long_color|
+-----+----------+
|GY   |Gray      |
|WH   |White     |
|BK   |Black     |
|WH   |White     |
|BK   |Black     |
|GY   |Gray      |
|BL   |Blue      |
|BK   |Black     |
|BR   |Brown     |
|SI   |Silver    |
|WH   |White     |
|GO   |Gold      |
|BK   |Black     |
|BK   |Black     |
|BK   |Black     |
|BK   |Black     |
|WH   |White     |
|NULL |NULL      |
|BK   |Black     |
|BK   |Black     |
+-----+----------+
only showing top 20 rows



                                                                                

In [14]:
# B.9 Using this new column, what’s the most frequent colour value for Toyotas (TOYT)?

toyotas = data_frame_FullAbbre.filter(data_frame_FullAbbre['Make'] == 'TOYT')  # Filter for Toyotas

# Group by 'Long_color', count, and order by count in descending order
most_frequent_color = toyotas.groupBy('Long_color')\
                             .count()\
                             .orderBy("count",ascending=False )\


# Show the most frequent color
most_frequent_color.take(5)


                                                                                

[Row(Long_color='Gray', count=489697),
 Row(Long_color='White', count=434595),
 Row(Long_color='Black', count=353812),
 Row(Long_color='Silver', count=347894),
 Row(Long_color='Blue', count=180091)]

Based on the table above the most frequent color is Gray!

In [15]:
spark_session.stop()