In [1]:
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.113:7077") \
        .appName("CAR PARKING FINES")\
        .config("spark.executor.cores",2)\
        .getOrCreate()

#.master("spark://ben-spark-master:7077") \
# Old API (RDD)
spark_context = spark_session.sparkContext

data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.113:9000/parking-citations.csv')\
    .cache()

In [2]:
data_frame.show

<bound method DataFrame.show of DataFrame[Ticket number: string, Issue Date: string, Issue time: string, Meter Id: string, Marked Time: string, RP State Plate: string, Plate Expiry Date: string, VIN: string, Make: string, Body Style: string, Color: string, Location: string, Route: string, Agency: string, Violation code: string, Violation Description: string, Fine amount: string, Latitude: string, Longitude: string]>

## B2 Print the schema for the DataFrame.

In [3]:
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



## B.3 Count the number of rows in the CSV file.

In [4]:
data_frame.count()

9257460

## B.4 Count the number of partitions in the underlying RDD.

In [5]:
data_frame.rdd.getNumPartitions()

40

## B.5 Drop the columns VIN, Latitude and Longitude.

In [6]:
drop_frame = data_frame.select([c for c in data_frame.columns if c not in {'VIN', 'Latitude','Longitude'}])

## B.6 Find the maximum fine amount. How many fines have this amount? You need to convert the ‘fine amount’ column to a float to do this correctly.

In [7]:
drop_frame.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|            CA|           201512| GMC|        VN|   WH|       525 S MAIN ST| 1C51|     1|        4000A1|   NO

In [8]:
df2 = drop_frame.withColumn("Fine amount",col("Fine amount").cast(FloatType()))
                        
df2.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: float (nullable = true)



In [9]:
df2.select("Fine amount").show()

+-----------+
|Fine amount|
+-----------+
|       50.0|
|       50.0|
|       58.0|
|       null|
|       93.0|
|       50.0|
|      163.0|
|      163.0|
|       93.0|
|       93.0|
|       93.0|
|       93.0|
|       50.0|
|       93.0|
|       68.0|
|       68.0|
|       68.0|
|       50.0|
|       93.0|
|       73.0|
+-----------+
only showing top 20 rows



In [10]:
df2 = df2.fillna(0, subset=['Fine amount'])
df2.select("Fine amount").show()

+-----------+
|Fine amount|
+-----------+
|       50.0|
|       50.0|
|       58.0|
|        0.0|
|       93.0|
|       50.0|
|      163.0|
|      163.0|
|       93.0|
|       93.0|
|       93.0|
|       93.0|
|       50.0|
|       93.0|
|       68.0|
|       68.0|
|       68.0|
|       50.0|
|       93.0|
|       73.0|
+-----------+
only showing top 20 rows



In [11]:
your_max_value = df2.agg({"Fine amount": "max"}).collect()[0]
print(your_max_value)
#print(linesWithSparkGDF)

Row(max(Fine amount)=505.0)


In [12]:
result = df2.where(col("Fine amount") == 505.0)

In [13]:
print(result)

DataFrame[Ticket number: string, Issue Date: string, Issue time: string, Meter Id: string, Marked Time: string, RP State Plate: string, Plate Expiry Date: string, Make: string, Body Style: string, Color: string, Location: string, Route: string, Agency: string, Violation code: string, Violation Description: string, Fine amount: float]


In [14]:
df2.filter(df2['Fine amount'] == '505.0').collect()

[Row(Ticket number='1112999020', Issue Date='2016-02-27T00:00:00', Issue time='335', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201605', Make='CHRY', Body Style='PA', Color='WH', Location='6200 S AVALON', Route='13A75', Agency='1', Violation code='558', Violation Description='8755**', Fine amount=505.0),
 Row(Ticket number='1109909205', Issue Date='2014-12-24T00:00:00', Issue time='110', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201410', Make='HOND', Body Style=None, Color='BK', Location='6500 S AVALON BLVD', Route='13E3', Agency='1', Violation code='558', Violation Description='8755**', Fine amount=505.0),
 Row(Ticket number='1109909220', Issue Date='2014-12-24T00:00:00', Issue time='125', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201507', Make='NISS', Body Style='PA', Color='BK', Location='617 E GAGE AV', Route='13E3', Agency='1', Violation code='558', Violation Description='8755**', Fine amou

In [15]:
df2.filter(df2['Fine amount'] == '505.0').count()

6

## B.7 Show the top 20 most frequent vehicle makes, and their frequencies.

In [16]:
from pyspark.sql.functions import desc
from pyspark.sql.functions import mean
#sort_fine = df2.sort(desc("Fine amount")).show(truncate=False)
#df2.groupby('Make').count().show()
#df2.groupBy('Make','Fine amount').count().sort(desc('count')).show()
df2.groupBy('Make').count().sort(desc('count')).show(20)

+----+-------+
|Make|  count|
+----+-------+
|TOYT|1531949|
|HOND|1043276|
|FORD| 807498|
|NISS| 662097|
|CHEV| 631413|
| BMW| 422916|
|MERZ| 376830|
|VOLK| 316002|
|HYUN| 285286|
|DODG| 271590|
|LEXS| 263269|
| KIA| 217795|
|JEEP| 214965|
|AUDI| 179718|
|MAZD| 169811|
|OTHR| 154376|
| GMC| 132788|
|INFI| 120340|
|CHRY| 120317|
|ACUR| 111265|
+----+-------+
only showing top 20 rows



## B.8 Let’s expand some abbreviations in the color column. Create a User Defined Function to create a new column, ‘color long’, mapping the original colors to their corresponding values in the dictionary below. If there is no key matching the original color, use the original color. COLORS = { 'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze', 'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold', 'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory', 'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon', 'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver', 'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown' }

In [17]:
df2.select("Color").show()

+-----+
|Color|
+-----+
|   GY|
|   WH|
|   BK|
|   WH|
|   BK|
|   GY|
|   BL|
|   BK|
|   BR|
|   SI|
|   WH|
|   GO|
|   BK|
|   BK|
|   BK|
|   BK|
|   WH|
| null|
|   BK|
|   BK|
+-----+
only showing top 20 rows



In [18]:
#newDf = df2.withColumn('Color', regexp_replace('Color', 'lane', 'ln'))
COLORS = { 'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze', 'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold', 'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory', 'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon', 'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver', 'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown' }

In [19]:
COLORS

{'AL': 'Aluminum',
 'AM': 'Amber',
 'BG': 'Beige',
 'BK': 'Black',
 'BL': 'Blue',
 'BN': 'Brown',
 'BR': 'Brown',
 'BZ': 'Bronze',
 'CH': 'Charcoal',
 'DK': 'Dark',
 'GD': 'Gold',
 'GO': 'Gold',
 'GN': 'Green',
 'GY': 'Gray',
 'GT': 'Granite',
 'IV': 'Ivory',
 'LT': 'Light',
 'OL': 'Olive',
 'OR': 'Orange',
 'MR': 'Maroon',
 'PK': 'Pink',
 'RD': 'Red',
 'RE': 'Red',
 'SI': 'Silver',
 'SL': 'Silver',
 'SM': 'Smoke',
 'TN': 'Tan',
 'VT': 'Violet',
 'WT': 'White',
 'WH': 'White',
 'YL': 'Yellow',
 'YE': 'Yellow',
 'UN': 'Unknown'}

In [33]:
df2 = df2.withColumn("color long", df2["Color"])
df3 = df2.na.replace(COLORS,1,"color long")
#df_with_x4 = df2.withColumn("color long", df3.select("Color"))
df3.select("color long").show()

+----------+
|color long|
+----------+
|      Gray|
|     White|
|     Black|
|     White|
|     Black|
|      Gray|
|      Blue|
|     Black|
|     Brown|
|    Silver|
|     White|
|      Gold|
|     Black|
|     Black|
|     Black|
|     Black|
|     White|
|      null|
|     Black|
|     Black|
+----------+
only showing top 20 rows



In [34]:
df3.groupBy('Make','color long').count().sort(desc('count')).show()

+----+----------+------+
|Make|color long| count|
+----+----------+------+
|TOYT|      Gray|346822|
|TOYT|     White|304620|
|TOYT|     Black|252199|
|FORD|     White|251349|
|TOYT|    Silver|248685|
|HOND|      Gray|248084|
|HOND|     Black|219925|
|CHEV|     White|177794|
|HOND|    Silver|169610|
|NISS|      Gray|155402|
|HOND|     White|149061|
| BMW|     Black|144181|
|FORD|     Black|144048|
|NISS|     Black|131662|
|TOYT|      Blue|128051|
|MERZ|     Black|127613|
|NISS|     White|125416|
|CHEV|     Black|123644|
|FORD|      Gray|119537|
|NISS|    Silver|105073|
+----+----------+------+
only showing top 20 rows



## B.9 Using this new column, what’s the most frequent colour value for Toyotas (TOYT)?

In [35]:
df3.filter(df3.Make == "TOYT").groupBy('Make','color long').count().sort(desc('count')).show()

+----+----------+------+
|Make|color long| count|
+----+----------+------+
|TOYT|      Gray|346822|
|TOYT|     White|304620|
|TOYT|     Black|252199|
|TOYT|    Silver|248685|
|TOYT|      Blue|128051|
|TOYT|       Red| 84175|
|TOYT|     Green| 57627|
|TOYT|      Gold| 30154|
|TOYT|    Maroon| 19882|
|TOYT|       Tan| 17006|
|TOYT|     Beige| 11572|
|TOYT|        OT| 10805|
|TOYT|     Brown|  8466|
|TOYT|    Yellow|  3413|
|TOYT|        PR|  3010|
|TOYT|    Orange|  2527|
|TOYT|   Unknown|  1343|
|TOYT|        TU|  1077|
|TOYT|        CO|   423|
|TOYT|      Pink|    89|
+----+----------+------+
only showing top 20 rows



In [36]:
spark_context.stop()