In [0]:
# Defining Cassandra connection details in Scala as it is only currently supported with Databricks
%scala
// define the cluster name and cassandra host name
val sparkClusterName = "test1"
val cassandraHostIP = "ec2-54-205-226-21.compute-1.amazonaws.com"

In [0]:
%scala
dbutils.fs.rm("/databricks/init/$sparkClusterName/cassandra.sh")

In [0]:
%scala
//adding the hostname to all worker nodes via init script
dbutils.fs.put(s"/databricks/init/$sparkClusterName/cassandra.sh",
  s"""
     #!/usr/bin/bash
     echo '[driver]."spark.cassandra.connection.host" = "$cassandraHostIP"' >> /home/ubuntu/databricks/common/conf/cassandra.conf
   """.trim, true)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

In [0]:
# Test the single batch file
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/email@gmail.com/spark-streaming/counterdata1.csv")
df1.show(5)

+-----+----+-----+---+----+------+------+-----------+-----------+----+--------+------------+----------------+-----+---------+------+-------+----+-----+------+-----------+--------+------------+-------------+-----------+------------+
|cosit|year|month|day|hour|minute|second|millisecond|minuteofday|lane|lanename|straddlelane|straddlelanename|class|classname|length|headway| gap|speed|weight|temperature|duration|validitycode|numberofaxles|axleweights|axlespacings|
+-----+----+-----+---+----+------+------+-----------+-----------+----+--------+------------+----------------+-----+---------+------+-------+----+-----+------+-----------+--------+------------+-------------+-----------+------------+
|  997|2021|    1| 15|   7|    30|    28|         90|        450|   1|  Test 1|           0|            null|    2|      CAR|   5.1|   1.62|1.32| 70.0|   0.0|        0.0|       0|           0|            0|       null|        null|
|  997|2021|    1| 15|   7|    30|    29|         50|        450|   1|  

In [0]:
df1.printSchema()

root
 |-- cosit: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- second: string (nullable = true)
 |-- millisecond: string (nullable = true)
 |-- minuteofday: string (nullable = true)
 |-- lane: string (nullable = true)
 |-- lanename: string (nullable = true)
 |-- straddlelane: string (nullable = true)
 |-- straddlelanename: string (nullable = true)
 |-- class: string (nullable = true)
 |-- classname: string (nullable = true)
 |-- length: string (nullable = true)
 |-- headway: string (nullable = true)
 |-- gap: string (nullable = true)
 |-- speed: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- temperature: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- validitycode: string (nullable = true)
 |-- numberofaxles: string (nullable = true)
 |-- axleweights: string (nullable = true)
 |--

In [0]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 5)

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
# define the schema
TrafSchema = StructType([ StructField("cosit", StringType(), True), StructField("year", StringType(), True), StructField("month", StringType(), True), StructField("day", StringType(), True), StructField("hour", StringType(), True), StructField("minute", StringType(), True), StructField("second", StringType(), True), StructField("millisecond", StringType(), True), StructField("minuteofday", StringType(), True), StructField("lane", StringType(), True), StructField("lanename", StringType(), True), StructField("straddlelane", StringType(), True), StructField("straddlelanename", StringType(), True), StructField("class", StringType(), True), StructField("classname", StringType(), True), StructField("length", StringType(), True), StructField("headway", StringType(), True), StructField("gap", StringType(), True), StructField("speed", StringType(), True), StructField("weight", StringType(), True), StructField("temperature", StringType(), True), StructField("duration", StringType(), True), StructField("validitycode", StringType(), True),StructField("numberofaxles", StringType(), True),StructField("axleweights", StringType(), True),StructField("axlespacings", StringType(), True)])


In [0]:
streamingInputDF = (
  spark
    .readStream
    .schema(TrafSchema)
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .format("csv")
    .load("dbfs:/FileStore/shared_uploads/email@gmail.com/streaming-files/")
)

In [0]:
# Total number of counts by vehicle class
streamingVehicleClassCountsDF = (
  streamingInputDF
    .groupBy(
      streamingInputDF.classname)
    .count()
)

In [0]:
streamingVehicleClassCountsDF.isStreaming

Out[21]: True

In [0]:
display(streamingVehicleClassCountsDF)

classname,count
CAR,8
HGV_ART,2
HGV_RIG,9
LGV,1
classname,2


In [0]:
# Writing the Dataframe to Cassandra 
%scala
//Writing the dataframe directly to cassandra
import org.apache.spark.sql.cassandra._


streamingVehicleClassCountsDF.write
  .format("org.apache.spark.sql.cassandra")
  .mode("overwrite")
  .options(Map( "table" -> "streamingVehicleClassCountsDF", "keyspace" -> "test_keyspace"))
  .save()

In [0]:
query = (
  streamingVehicleClassCountsDF
    .writeStream
    .format("memory")      
    .queryName("counts")  
    .outputMode("complete")
    .start()
)

In [0]:
import pyspark.sql.functions as F
streamingVehicleClassAvgSpeedCountsDF = (
  streamingInputDF
    .groupBy(
      streamingInputDF.classname)
    .agg(F.round(F.avg(streamingInputDF.speed), 3))
)

In [0]:
display(streamingVehicleClassAvgSpeedCountsDF)

classname,"round(avg(speed), 3)"
CAR,70.625
HGV_ART,70.0
HGV_RIG,70.333
LGV,76.0
classname,


In [0]:
# Writing the Dataframe to Cassandra 
%scala
//Writing the dataframe directly to cassandra
import org.apache.spark.sql.cassandra._


streamingVehicleClassAvgSpeedCountsDF.write
  .format("org.apache.spark.sql.cassandra")
  .mode("overwrite")
  .options(Map( "table" -> "streamingVehicleClassAvgSpeedCountsDF", "keyspace" -> "test_keyspace"))
  .save()

In [0]:
streamingCountCositDF = (
  streamingInputDF
    .groupBy(
      streamingInputDF.cosit)
    .count()
    .withColumnRenamed('count', 'total_count')
    .orderBy()
)

In [0]:
display(streamingCountCositDF)

cosit,total_count
cosit,1
997,10


In [0]:
streamingCountCositDF.createTempView('count_cosit')

In [0]:
streamingTop3CositDF = spark.sql('select cosit, total_count from count_cosit order by total_count desc LIMIT 3')


In [0]:
display(streamingTop3CositDF)

cosit,total_count
997,20
cosit,2


In [0]:
# Writing the Dataframe to Cassandra 
%scala
//Writing the dataframe directly to cassandra
import org.apache.spark.sql.cassandra._


streamingTop3CositDF.write
  .format("org.apache.spark.sql.cassandra")
  .mode("overwrite")
  .options(Map( "table" -> "streamingTop3CositDF", "keyspace" -> "test_keyspace"))
  .save()

In [0]:
streamingHGVCountsDF = (
  streamingInputDF
    .where((streamingInputDF.classname == 'HGV_RIG') | (streamingInputDF.classname == 'HGV_ART'))
    .groupBy(
      streamingInputDF.classname)
    .count()
)

In [0]:
display(streamingHGVCountsDF)

classname,count
HGV_ART,2
HGV_RIG,9


In [0]:
# Writing the Dataframe to Cassandra 
%scala
//Writing the dataframe directly to cassandra
import org.apache.spark.sql.cassandra._


streamingHGVCountsDF.write
  .format("org.apache.spark.sql.cassandra")
  .mode("overwrite")
  .options(Map( "table" -> "streamingHGVCountsDF", "keyspace" -> "test_keyspace"))
  .save()