In [1]:
from pyspark.sql import SparkSession 
 
spark_session = SparkSession.builder.master("spark://192.168.2.122:7077")\
        .appName("sergi_olives_juan-a3-part_b")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout", "30s")\
        .config("spark.cores.max", 2)\
        .getOrCreate()

spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/25 10:44:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Section B - Working with DataFrames and SQL

## Question B.1

In [2]:
df = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.122:9000/parking-citations.csv')\
    .cache()
df.show()

                                                                                

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

## Question B.2

In [3]:
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



## Question B.3

In [4]:
print(f'Number of rows: {df.count()}')



Number of rows: 13077724


                                                                                

## Question B.4

In [5]:
print(f'Number of partitions: {df.rdd.getNumPartitions()}')

Number of partitions: 16


## Question B.5

In [6]:
columns = ['VIN', 'Latitude', 'Longitude']
df = df.drop(*columns)
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



## Question B.6

In [7]:
from pyspark.sql.functions import col

df = df.withColumn('Fine amount', col('Fine amount').cast('float'))

max_fine = df.agg({'Fine amount': 'max'}).collect()[0][0]
num_fines = df.filter(col('Fine amount') == max_fine).count()

print(f'The maximum fine amount is ${max_fine:.2f}, and there are {num_fines} fines with this amount.')



The maximum fine amount is $1100.00, and there are 626 fines with this amount.


                                                                                

## Question B.7

In [8]:
from pyspark.sql.functions import desc

top_makes = df.groupBy('Make').count().orderBy(desc('count')).limit(20)
top_makes.show()



+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+



                                                                                

## Question B.8

In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

COLORS = { 
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 'BL':'Blue',
'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze', 'CH':'Charcoal', 'DK':'Dark',
'GD':'Gold', 'GO':'Gold', 'GN':'Green', 'GY':'Gray', 'GT':'Granite',
'IV':'Ivory', 'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White',
'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

# Define UDF
def expand_color(color):
    return COLORS.get(color, color)

# Register the UDF with Spark
expand_color_udf = udf(expand_color, StringType())

# Apply UDF
df = df.withColumn('color long', expand_color_udf(df['color']))

## Question B.9

In [10]:
from pyspark.sql.functions import desc

toyota_colors = df.filter((df['Make'] == 'TOYT'))
color_counts = toyota_colors.groupBy('color long').count()
most_frequent_color = color_counts.orderBy(desc('count')).first()['color long']

print(f'The most frequent color for Toyotas is: {most_frequent_color}')



The most frequent color for Toyotas is: Gray


                                                                                

In [11]:
spark_session.stop()