In [133]:
from pyspark.sql import SparkSession

spark_session = SparkSession.builder\
.master("spark://192.168.2.70:7077") \
.appName("ludvigwesterholm_B")\
.config("spark.dynamicAllocation.enabled", True)\
.config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
.config("spark.shuffle.service.enabled", True)\
.config("spark.dynamicAllocation.executorIdleTimeout","30s")\
.config("spark.cores.max", 4)\
.getOrCreate()

In [134]:
#B.1 reading the csv
df = spark_session.read.csv("hdfs://192.168.2.70:9000/parking-citations.csv")
df.show()

                                                                                

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+--------------------+-----------+---------+---------+------------------+-----------------+--------------------+
|          _c0|                 _c1|       _c2|     _c3|        _c4|           _c5|              _c6| _c7| _c8|       _c9| _c10|                _c11| _c12|  _c13|          _c14|                _c15|       _c16|     _c17|     _c18|              _c19|             _c20|                _c21|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+--------------------+-----------+---------+---------+------------------+-----------------+--------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|     

In [135]:
#B.2 printing the schema
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)



In [136]:
#B.3 coutning number of rows
df.count()

                                                                                

13077725

In [137]:
#B.4 counting number of partitions
df.rdd.getNumPartitions()

16

In [138]:
#B.5 dropping columns
df = df.drop('_c7', '_c17', '_c18')

In [139]:
#B.6 finding biggest fine
from pyspark.sql.types import FloatType

#creating new column with fines with floats
df = df.withColumn("fines", df["_c16"].cast(FloatType()))

#selecting fines column
fines = df.select("fines")

#dropping all not available
fines = fines.dropna()

#finding the biggest fine
max_fine = fines.rdd.max()[0]

#counting how many times it has been given
df.where(df["fines"]==max_fine).count()

                                                                                

626

In [140]:
#B.7 

#selecting all the makes
makes = df.select("_c8")

#dropping all not availables
makes = makes.dropna()

#creating a dictionary
my_dict = makes.rdd.countByKey()

                                                                                

In [141]:
#printing the top 20 cars and their frequency
sorted_dict = dict(sorted(my_dict.items(), key=lambda x: x[1], reverse=True))
dict_keys = list(sorted_dict.keys())

for x in range(20):
    print(f"Make: {dict_keys[x]} \t Frequency: {sorted_dict[dict_keys[x]]}")

Make: TOYT 	 Frequency: 2150768
Make: HOND 	 Frequency: 1479996
Make: FORD 	 Frequency: 1116235
Make: NISS 	 Frequency: 945133
Make: CHEV 	 Frequency: 892676
Make: BMW 	 Frequency: 603092
Make: MERZ 	 Frequency: 543298
Make: VOLK 	 Frequency: 432030
Make: HYUN 	 Frequency: 404917
Make: DODG 	 Frequency: 391686
Make: LEXS 	 Frequency: 368420
Make: KIA 	 Frequency: 328155
Make: JEEP 	 Frequency: 316300
Make: AUDI 	 Frequency: 255395
Make: MAZD 	 Frequency: 242344
Make: OTHR 	 Frequency: 205546
Make: GMC 	 Frequency: 184889
Make: INFI 	 Frequency: 174315
Make: CHRY 	 Frequency: 159948
Make: SUBA 	 Frequency: 154640


In [142]:
#B.8
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
COLORS = {
    'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 
    'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze', 
    'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold', 
    'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
    'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
    'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver',
    'SL':'Silver', 'SM':'Smoke', 'TN':'Tan', 'VT':'Violet',
    'WT':'White', 'WH':'White', 'YL':'Yellow', 'YE':'Yellow',
    'UN':'Unknown'
}


# creating UDF  
color_long_func = udf(lambda x: COLORS.get(x),StringType())

#creating the new column color_long and applying the color_long_func to fill the new column with the longer name
#using values from the color columns as keys for dictionary
df = df.withColumn("color_long", color_long_func("_c10"))

#printing the result
df.select("color_long").show()


+----------+
|color_long|
+----------+
|      null|
|      Gray|
|     White|
|     Black|
|     White|
|     Black|
|      Gray|
|      Blue|
|     Black|
|     Brown|
|    Silver|
|     White|
|      Gold|
|     Black|
|     Black|
|     Black|
|     Black|
|     White|
|      null|
|     Black|
+----------+
only showing top 20 rows



In [143]:
#B.9

#Filter to see only toyotas and then we group by the color_long and order it by count and biggest first
df.where(df["_c8"] == "TOYT").groupBy("color_long").count().orderBy("count", ascending=False).show()

#as we can see, the most frequent color is Gray.



+----------+------+
|color_long| count|
+----------+------+
|      Gray|489697|
|     White|434595|
|     Black|353812|
|    Silver|347894|
|      Blue|180091|
|       Red|119074|
|     Green| 74968|
|      Gold| 40646|
|    Maroon| 26242|
|       Tan| 23355|
|      null| 23141|
|     Beige| 15723|
|     Brown| 11454|
|    Yellow|  4372|
|    Orange|  3575|
|   Unknown|  2012|
|      Pink|   117|
+----------+------+



                                                                                

In [144]:
spark_session.stop()