In [1]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.1.153:7077") \
        .appName("A2_B")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [2]:
#B.1 Load the CSV file from HDFS, and call show() to verify the data is loaded correctly.

data_frame_tickets = spark_session.read\
    .option("header", "true")\
    .csv("hdfs://192.168.1.153:9000/parking-citations.csv")\
    .cache()


In [3]:
#B.2 Print the schema for the DataFrame
data_frame_tickets.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|         

In [9]:
# B.3 Count the number of rows in the CSV file.
data_frame_tickets.count()

9257460

In [11]:
#B.4 Count the number of partitions in the underlying RDD.
data_frame_tickets.rdd.getNumPartitions()

10

In [4]:
#B.5 Drop the columns VIN, Latitude and Longitude.

drop_list = ['VIN', 'Latitude', 'Longitude']

new_data_frame=data_frame_tickets.select([column for column in data_frame_tickets.columns if column not in drop_list])

new_data_frame.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|            CA|           201512| GMC|        VN|   WH|       525 S MAIN ST| 1C51|     1|        4000A1|   NO

In [5]:
#B.6  Find the maximum fine amount. How many fines have this amount?
new_data_frame.describe('Fine amount').show()
fines_max_amount_count=new_data_frame.filter(new_data_frame['Fine amount'] == 98).count()
print ("The maximum fine amount has %s fines." %(fines_max_amount_count,))

+-------+------------------+
|summary|       Fine amount|
+-------+------------------+
|  count|           9250556|
|   mean|  70.1290350547578|
| stddev|32.135865052383785|
|    min|                10|
|    max|              98.0|
+-------+------------------+

The maximum fine amount has 333 fines.


In [6]:
#B.7 Show the top 20 most frequent vehicle makes, and their frequencies.

new_data_frame.groupBy(data_frame_tickets.Make).count().orderBy('count',ascending=False).show()



+----+-------+
|Make|  count|
+----+-------+
|TOYT|1531949|
|HOND|1043276|
|FORD| 807498|
|NISS| 662097|
|CHEV| 631413|
| BMW| 422916|
|MERZ| 376830|
|VOLK| 316002|
|HYUN| 285286|
|DODG| 271590|
|LEXS| 263269|
| KIA| 217795|
|JEEP| 214965|
|AUDI| 179718|
|MAZD| 169811|
|OTHR| 154376|
| GMC| 132788|
|INFI| 120340|
|CHRY| 120317|
|ACUR| 111265|
+----+-------+
only showing top 20 rows



In [7]:
#B.8 Let’s expand some abbreviations in the color column. Create a User Defined Function to
#create a new column, ‘color long’, mapping the original colors to their corresponding values
#in the dictionary below. If there is no key matching the original color, use the original color.


from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

color_long =udf (lambda x: COLORS.get(x), StringType())
new_color_data_frame = data_frame_tickets.withColumn('Color',color_long(data_frame_tickets.Color))
new_color_data_frame.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+------+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style| Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+------+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  Gray|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|     

In [144]:
#B.9 Using this new column, what’s the most frequent colour value for Toyotas (TOYT)?
toy_color=new_color_data_frame.filter('Make == "TOYT"')\
.groupBy(new_color_data_frame.Color).count()\
.orderBy('count',ascending=False)

print( "The most common color is: ",(toy_color.take(1)))

The most common color is:  [Row(Color='Gray', count=346822)]


In [None]:
spark_context.stop()