In [20]:
import pyspark.sql.functions as f
from pyspark.sql.functions import col

In [21]:
# Create path for data file and load data
path = "traffic/per-vehicle-records-2021-01-15.csv"
data = spark.read.format("csv").options(header=True).load(path)

In [22]:
# Get names of columns for future reference
print(data.columns)

['cosit', 'year', 'month', 'day', 'hour', 'minute', 'second', 'millisecond', 'minuteofday', 'lane', 'lanename', 'straddlelane', 'straddlelanename', 'class', 'classname', 'length', 'headway', 'gap', 'speed', 'weight', 'temperature', 'duration', 'validitycode', 'numberofaxles', 'axleweights', 'axlespacings']


## # 1. Calculate the usage of Irish road network in terms of percentage grouped

In [4]:
# 1. Calculate the usage of Irish road network in terms of percentage grouped
# by vehicle category.

total = data.count()
q1 = data.groupby('classname').count().withColumnRenamed('classname','category')\
.withColumn('percentage', f.round(f.col('count')/total*100 , 2)).drop('count')\
.withColumn('id', f.monotonically_increasing_id())
q1.show()

+--------+----------+---+
|category|percentage| id|
+--------+----------+---+
|     CAR|     70.24|  0|
| HGV_ART|      7.57|  1|
|     BUS|      0.78|  2|
| HGV_RIG|      4.37|  3|
|    null|      0.01|  4|
| CARAVAN|      0.62|  5|
|     LGV|     15.84|  6|
|   MBIKE|      0.56|  7|
+--------+----------+---+



## Move above data frame to Cassandra

In [6]:
# Move above data frame to Cassandra
q1.select("id", "category", "percentage")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="percentage_per_vehicle_category", keyspace="sensors")\
.save(mode="append")

## Read stored data frame from Cassndra

In [7]:
# Read stored data frame from Cassndra
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace='traffic', table='percentage_per_vehicle_category').orderBy('id').show()

+---+--------+----------+
| id|category|percentage|
+---+--------+----------+
|  0|     CAR|     70.24|
|  1| HGV_ART|      7.57|
|  2|     BUS|      0.78|
|  3| HGV_RIG|      4.37|
|  4|    null|      0.01|
|  5| CARAVAN|      0.62|
|  6|     LGV|     15.84|
|  7|   MBIKE|      0.56|
+---+--------+----------+



## These are locations for each sensor of motorway junctions.

In [4]:
# Get cosit for each junction for motorways
# Note: Cosit for Jn01-Jn02 Dublin port to Santry is not available on the site map
m50 = {'000000001012' : 'Jn02-Jn03',
      '000000001500':'Jn03-Jn04',
      '000000001501':'Jn04-Jn05',
      '000000001502':'Jn05-Jn06',
      '000000001508':'Jn06-Jn07',
      '000000001503':'Jn07-Jn09',
      '000000001509':'Jn09-Jn10',
      '000000001504':'Jn10-Jn11',
      '000000001505':'Jn11-Jn12',
      '000000001506':'Jn12-Jn13',
      '000000001507':'Jn13-Jn14',
      '000000015010':'Jn14-Jn15',
      '000000015011':'Jn15-Jn16',
      '000000015012':'Jn16-Jn17'
      }

## # 2. Calculate the highest and lowest hourly flows on M50 - show the

In [9]:
# 2. Calculate the highest and lowest hourly flows on M50 - show the
# hours and total number of vehicle counts

hourly_flows = data.select("cosit", "hour").where(f.col('cosit').isin(list(m50.keys()))).groupBy('hour').count().sort('count')
mx = hourly_flows.agg({'count' : 'max'}).collect()[0][0] # Collect Max Value
mn = hourly_flows.agg({'count' : 'min'}).collect()[0][0] # Collect Min value

# Select row where max value
mx_flow = hourly_flows.select('hour', 'count').where(f.col('count') == mx).withColumn('flow', f.lit('highest'))
# Select row where min value
mn_flow = hourly_flows.select('hour', 'count').where(f.col('count') == mn).withColumn('flow', f.lit('lowest'))

# Union both max and min rows to form a table
motorway_hourly_flows = mx_flow.union(mn_flow).withColumnRenamed('count', 'vehicle_count')
motorway_hourly_flows.show() # add auto_increment column for ids

+----+-------------+-------+
|hour|vehicle_count|   flow|
+----+-------------+-------+
|  16|        38655|highest|
|   2|         1167| lowest|
+----+-------------+-------+



## Move above data frame to Cassandra

In [11]:
motorway_hourly_flows.select("flow", "hour", "vehicle_count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="motorway_hourly_flows", keyspace="sensors")\
.save(mode="append")

In [12]:
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace='traffic', table='motorway_hourly_flows').show()

+-------+----+-------------+
|   flow|hour|vehicle_count|
+-------+----+-------------+
| lowest|   2|         1167|
|highest|  16|        38655|
+-------+----+-------------+



## 3. Calculate the evening and morning rush hours on M50

In [8]:
# 3. Calculate the evening and morning rush hours on M50 - show the
# hours and the total counts.

morning_hours = ['6','7','8','9','10']
evening_hours = ['16','17','18','19','20']

data.head()
morning_flows = data.select('hour', 'cosit').where(f.col('cosit').isin(list(m50.keys())))\
.where(f.col('hour').isin(morning_hours))\
.groupby('hour').count().withColumn('time', f.lit('morning'))

evening_flows = data.select('hour', 'cosit').where(f.col('cosit').isin(list(m50.keys())))\
.where(f.col('hour').isin(evening_hours))\
.groupby('hour').count().withColumn('time', f.lit('evening'))

rush_hours = morning_flows.union(evening_flows)

rush_hours.show()


+----+-----+-------+
|hour|count|   time|
+----+-----+-------+
|   7|22528|morning|
|   8|27180|morning|
|   6|18728|morning|
|   9|29992|morning|
|  10|29279|morning|
|  16|38655|evening|
|  18|18173|evening|
|  17|36016|evening|
|  19|13788|evening|
|  20|11647|evening|
+----+-----+-------+



In [9]:
rush_hours.select("hour", "count", "time")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="motorway_rush_hour", keyspace="sensors")\
.save(mode="append")

In [11]:
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace='sensors', table='motorway_rush_hour').show()

+----+-----+-------+
|hour|count|   time|
+----+-----+-------+
|  18|18173|evening|
|   9|29992|morning|
|  17|36016|evening|
|  20|11647|evening|
|   7|22528|morning|
|  10|29279|morning|
|  16|38655|evening|
|  19|13788|evening|
|   8|27180|morning|
|   6|18728|morning|
+----+-----+-------+



## # 4. Calculate average speed between each junction on M50

In [12]:
# 4. Calculate average speed between each junction on M50 (e.g., junction
# 1 - junction2, junction 2 - junction 3, etc.).

motorway_data = data.where(f.col('cosit').isin(list(m50.keys())))
average_speed = motorway_data.select('cosit',col('speed').cast('double').alias('speed')).groupby('cosit').mean('speed')
average_speed = average_speed.withColumnRenamed('avg(speed)', 'average')
average_speed.show()

+------------+------------------+
|       cosit|           average|
+------------+------------------+
|000000001500| 88.83526554404145|
|000000015011|104.02299711199059|
|000000001505| 98.92545893412945|
|000000001503| 98.45699912510936|
|000000001509| 94.73736586836881|
|000000001502| 99.01588546773877|
|000000001507|102.64251095162643|
|000000001506|102.11667798306114|
|000000001501| 98.10988853617204|
|000000001012| 84.09989342515166|
|000000015010|106.05619648259243|
|000000015012|106.45533712709087|
|000000001504|100.41781593019984|
|000000001508| 96.13615310118321|
+------------+------------------+



In [13]:
average_speed.createTempView('average_speed')
location = spark.createDataFrame(data=m50.items(), schema=['cosit', 'location'])
location.createTempView('location')
location.show()

+------------+---------+
|       cosit| location|
+------------+---------+
|000000001012|Jn02-Jn03|
|000000001500|Jn03-Jn04|
|000000001501|Jn04-Jn05|
|000000001502|Jn05-Jn06|
|000000001508|Jn06-Jn07|
|000000001503|Jn07-Jn09|
|000000001509|Jn09-Jn10|
|000000001504|Jn10-Jn11|
|000000001505|Jn11-Jn12|
|000000001506|Jn12-Jn13|
|000000001507|Jn13-Jn14|
|000000015010|Jn14-Jn15|
|000000015011|Jn15-Jn16|
|000000015012|Jn16-Jn17|
+------------+---------+



In [15]:
motorway_average_speed= spark.sql("SELECT average_speed.cosit, location, ROUND(average, 2) as average_speed \
          FROM average_speed, location\
          WHERE average_speed.cosit = location.cosit ORDER BY location")
motorway_average_speed.show()

+------------+---------+-------------+
|       cosit| location|average_speed|
+------------+---------+-------------+
|000000001012|Jn02-Jn03|         84.1|
|000000001500|Jn03-Jn04|        88.84|
|000000001501|Jn04-Jn05|        98.11|
|000000001502|Jn05-Jn06|        99.02|
|000000001508|Jn06-Jn07|        96.14|
|000000001503|Jn07-Jn09|        98.46|
|000000001509|Jn09-Jn10|        94.74|
|000000001504|Jn10-Jn11|       100.42|
|000000001505|Jn11-Jn12|        98.93|
|000000001506|Jn12-Jn13|       102.12|
|000000001507|Jn13-Jn14|       102.64|
|000000015010|Jn14-Jn15|       106.06|
|000000015011|Jn15-Jn16|       104.02|
|000000015012|Jn16-Jn17|       106.46|
+------------+---------+-------------+



In [16]:
motorway_average_speed.select("cosit", "location", "average_speed")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="motorway_average_speed", keyspace="sensors")\
.save(mode="append")

In [17]:
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace='sensors', table='motorway_average_speed').show()

+-----+-------------+---------+
|cosit|average_speed| location|
+-----+-------------+---------+
| 1507|       102.64|Jn13-Jn14|
| 1508|        96.14|Jn06-Jn07|
| 1505|        98.93|Jn11-Jn12|
|15011|       104.02|Jn15-Jn16|
| 1504|       100.42|Jn10-Jn11|
| 1501|        98.11|Jn04-Jn05|
|15010|       106.06|Jn14-Jn15|
| 1509|        94.74|Jn09-Jn10|
| 1012|         84.1|Jn02-Jn03|
| 1502|        99.02|Jn05-Jn06|
| 1506|       102.12|Jn12-Jn13|
|15012|       106.46|Jn16-Jn17|
| 1503|        98.46|Jn07-Jn09|
| 1500|        88.84|Jn03-Jn04|
+-----+-------------+---------+



In [19]:
# 5. Calculate the top 10 locations with highest number of counts of HGVs
# (class). Map the COSITs with their names given on the map.

hgv_locations = data.select("cosit").where(f.col('classname').contains('HGV')).groupby('cosit').count().sort(col('count').desc()).take(10)
hgv_locations= spark.createDataFrame(hgv_locations)
hgv_locations.show()

Py4JJavaError: An error occurred while calling o438.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 57.0 failed 1 times, most recent failure: Lost task 0.0 in stage 57.0 (TID 63) (10.0.2.15 executor driver): java.io.FileNotFoundException: /tmp/blockmgr-16ee883d-85f4-42f6-b763-a25a6b4f14e3/23/temp_shuffle_0562a80d-e4bf-45e4-9871-6582ee49b5a2 (No space left on device)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:133)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:152)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:279)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-16ee883d-85f4-42f6-b763-a25a6b4f14e3/23/temp_shuffle_0562a80d-e4bf-45e4-9871-6582ee49b5a2 (No space left on device)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:133)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:152)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:279)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


In [30]:
hgv_locations = data.select("cosit").where(f.col('classname').contains('HGV')).groupby('cosit').count().sort(col('count').desc()).limit(10)
hgv_locations.show()


Py4JJavaError: An error occurred while calling o595.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 64.0 failed 1 times, most recent failure: Lost task 0.0 in stage 64.0 (TID 76) (10.0.2.15 executor driver): java.io.FileNotFoundException: /tmp/blockmgr-16ee883d-85f4-42f6-b763-a25a6b4f14e3/1a/temp_shuffle_97348e9c-3f37-42d8-954d-2679cf0c428b (No space left on device)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:133)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:152)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:279)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-16ee883d-85f4-42f6-b763-a25a6b4f14e3/1a/temp_shuffle_97348e9c-3f37-42d8-954d-2679cf0c428b (No space left on device)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:133)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:152)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:279)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
location_maps = {'000000001500':'Jn03-Jn04',s
          '000000001501':'Jn04-Jn05',
          '000000001503':'Jn07-Jn09',
          '000000001502':'Jn05-Jn06',
          '000000001508':'Jn06-Jn07',
          '000000001070':'Jn01-Jn1a',
          '000000001071':'Jn02-Jn03',
          '000000001072':'Jn01a-Jn02',
          '000000000997':'not_assigned',
          '000000000998':'not_assigned2'
                }


In [None]:
location_maps_df = spark.createDataFrame(data=location_maps.items(), schema=['cosit', 'location'])

In [None]:
location_maps_df.createTempView('location_maps_df')
hgv_locations.createTempView('hgv_locations')

In [None]:
q5 = spark.sql('SELECT hgv_locations.cosit, location_maps_df.location, count \
FROM location_maps_df, hgv_locations \
WHERE location_maps_df.cosit = hgv_locations.cosit')
q5.show()

In [None]:
q5.select("cosit", "location", "count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="top_hgv_locations", keyspace="traffic")\
.save(mode="append")

In [None]:
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace='traffic', table='top_hgv_locations').show()