In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("BDF").getOrCreate()

In [2]:
vehicle_records_csv_url = "per-vehicle-records-2020-01-01.csv"
vehicle_records_df = spark.read.format('csv').load(vehicle_records_csv_url, inferSchema = True, header = True)


In [3]:
# Solution 1 : Calculate the usage of Irish road network in terms of percentage grouped by vehicle category.

derived_grp_by_cls_df = vehicle_records_df.groupBy("classname").count().select(
    col("classname").alias("category_name"), 
    col("count").alias("category_count"))

vehicle_records_count = vehicle_records_df.count()

t1_df = derived_grp_by_cls_df.select( 
    '*',
    round((col("category_count")*100/vehicle_records_count), 2).alias('percentage_grp_by_category'))

t1_df.show(10)

+-------------+--------------+--------------------------+
|category_name|category_count|percentage_grp_by_category|
+-------------+--------------+--------------------------+
|          CAR|       3934595|                     91.21|
|      HGV_ART|         37816|                      0.88|
|          BUS|         27931|                      0.65|
|      HGV_RIG|         37309|                      0.86|
|         null|           522|                      0.01|
|      CARAVAN|          6757|                      0.16|
|          LGV|        255782|                      5.93|
|        MBIKE|         13207|                      0.31|
+-------------+--------------+--------------------------+



In [4]:
#Solution 2 : Calculate the highest and lowest hourly flows on M50 - show the hours and total number of vehicle counts.

hr_count_df = vehicle_records_df.groupBy("hour").count().withColumnRenamed("count", "hr_count") 

highest_lowest_data_df = hr_count_df.describe(['hr_count']).filter('summary == "min" or summary == "max"') \
    .withColumnRenamed("hr_count","summary_count")

highest_lowest_data_with_hr_df = highest_lowest_data_df.join(hr_count_df, col("summary_count") == col("hr_count"), 'left_outer') \
    .select("summary", "hour", "hr_count")

highest_lowest_data_with_hr_df.show()

+-------+----+--------+
|summary|hour|hr_count|
+-------+----+--------+
|    min|   4|   31103|
|    max|  14|  423811|
+-------+----+--------+



In [5]:
##Solution3 :Calculate the evening and morning rush hours on M50 - show the hours and the total counts.

hrOverCategory = Window.partitionBy("hr_flag")

evening_and_morning_rush_hr_df = hr_count_df.select("*", when(col("hour") < 12, "Morning").otherwise("Evening").alias("hr_flag") ) \
.withColumn("max_hr_count", max("hr_count").over(hrOverCategory)) \
.filter(col("max_hr_count") == col("hr_count")) \
.select("hr_flag", "hour", "hr_count")

evening_and_morning_rush_hr_df.show()

+-------+----+--------+
|hr_flag|hour|hr_count|
+-------+----+--------+
|Evening|  14|  423811|
|Morning|  11|  229176|
+-------+----+--------+



In [22]:
#Solution 4: Calculate average speed between each junction on M50 (e.g., junction1 - junction2, junction 3 - junction 4, etc.).

average_speed_between_each_junction_df = vehicle_records_df.groupBy("cosit") \
    .agg(round(avg("speed"), 2).alias("speed_avg")).orderBy("cosit") \
    .withColumn("lead_cal_number", lit("lead_cal_number")) \
    .withColumn("cosit2", lead("cosit", 1).over(Window.partitionBy("lead_cal_number").orderBy("cosit"))) \
    .select(concat("cosit", lit(" , "), "cosit2").alias("junction"), "speed_avg")
    
average_speed_between_each_junction_df.show(truncate=False)


+-----------+---------+
|junction   |speed_avg|
+-----------+---------+
|997 , 1011 |70.03    |
|1011 , 1012|72.22    |
|1012 , 1013|87.6     |
|1013 , 1014|67.12    |
|1014 , 1015|88.99    |
|1015 , 1016|116.89   |
|1016 , 1017|120.81   |
|1017 , 1021|112.41   |
|1021 , 1022|98.64    |
|1022 , 1023|90.21    |
|1023 , 1024|92.43    |
|1024 , 1025|91.27    |
|1025 , 1027|94.55    |
|1027 , 1031|88.51    |
|1031 , 1032|79.72    |
|1032 , 1033|90.14    |
|1033 , 1034|94.62    |
|1034 , 1035|89.22    |
|1035 , 1036|96.0     |
|1036 , 1038|66.87    |
+-----------+---------+
only showing top 20 rows



In [11]:
#Solution 5: Calculate the top 10 locations with highest number of counts of HGVs(class). Map the COSITs with their names given on the map.

#Load cosit site details
cosit_sites_json_url = "cosit_sites.json"
cosit_sites_df = spark.read.json(cosit_sites_json_url).withColumn("cosit", col("cosit").cast(IntegerType()))

top10_HGVS_df = vehicle_records_df.filter("class == 5 or class == 6") \
.groupBy(["cosit"]).count().orderBy(desc("count")).limit(10)


top10_HGVS_with_names_df = top10_HGVS_df.join(cosit_sites_df, top10_HGVS_df['cosit'] == cosit_sites_df['cosit'], 'left_outer') \
.select(top10_HGVS_df['cosit'], col("name").alias("cosit_name"), col("count").alias("count_hgv")) \
.orderBy(desc("count_HGV"))

top10_HGVS_with_names_df.show(truncate=False)

+------+----------------+---------+
|cosit |cosit_name      |count_hgv|
+------+----------------+---------+
|997   |test            |21065    |
|1015  |TMU M01 010.0 S |1760     |
|1014  |TMU M01 000.0 N |1559     |
|1508  |TMU M50 015.0 S |1349     |
|1502  |TMU M50 010.0 N |1344     |
|200723|TMU N07 015.0 W |1283     |
|1503  |TMU M50 020.0 N |1243     |
|1073  |N07 E06.5       |1195     |
|1070  |TMU N07 001.0 E |1153     |
|1501  |TMU M50 005.0 N |1075     |
+------+----------------+---------+



In [18]:
t1_df.write.format("org.apache.spark.sql.cassandra").mode('append') \
    .options(table="question_1", keyspace="assignment_1") \
    .save()

In [19]:
highest_lowest_data_with_hr_df.write.format("org.apache.spark.sql.cassandra").mode('append') \
    .options(table="question_2", keyspace="assignment_1") \
    .save()

In [20]:
evening_and_morning_rush_hr_df.write.format("org.apache.spark.sql.cassandra").mode('append') \
    .options(table="question_3", keyspace="assignment_1") \
    .save()

In [24]:
average_speed_between_each_junction_df.write.format("org.apache.spark.sql.cassandra").mode('append') \
    .options(table="question_4", keyspace="assignment_1") \
    .save()

In [13]:
top10_HGVS_with_names_df.write.format("org.apache.spark.sql.cassandra").mode('append') \
    .options(table="question_5", keyspace="assignment_1") \
    .save()