In [1]:
import org.apache.spark.sql.types.{StructType, StringType}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame

In [2]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.4


In [3]:
val inputStreamPath = "/home/jovyan/data/events-stream"
val modelPath = "/home/jovyan/models/spark-ml-model"

val dataSchema = new StructType()
    .add("tweet", StringType)

val inputDF = spark
    .readStream
    .schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .json(inputStreamPath)

inputStreamPath = /home/jovyan/data/events-stream
modelPath = /home/jovyan/models/spark-ml-model
dataSchema = StructType(StructField(tweet,StringType,true))
inputDF = [tweet: string]


[tweet: string]

In [4]:
import org.apache.spark.sql.functions._

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))
val model = PipelineModel.load(modelPath)

val predictionsDF = model
    .transform(inputDF)
    .select(lit(current_timestamp()).as("timestamp"),
            $"tweet",
            $"prediction",
            getProbability($"probability").alias("clean_probability"))

getProbability = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))
model = pipeline_755002f04345
predictionsDF = [timestamp: timestamp, tweet: string ... 2 more fields]


[timestamp: timestamp, tweet: string ... 2 more fields]

In [5]:
predictionsDF
    .writeStream
    .outputMode("append")
    .format("console")
    .start()

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1f54615

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+--------------------+----------+-------------------+
|           timestamp|               tweet|prediction|  clean_probability|
+--------------------+--------------------+----------+-------------------+
|2020-02-15 18:10:...|designing a mobil...|       1.0| 0.5110175201989817|
|2020-02-15 18:10:...|feels really ill....|       1.0| 0.5110175201989817|
|2020-02-15 18:10:...|Watching the Canu...|       1.0| 0.5017604908907871|
|2020-02-15 18:10:...|@ZoeeLove I'm sor...|       0.0|0.49036663265941344|
|2020-02-15 18:10:...|Pen &amp; Brush T...|       0.0|0.48223241889832397|
|2020-02-15 18:10:...|@mkerrigan7 No Le...|       0.0|0.44136829050705195|
|2020-02-15 18:10:...|     Im being bored |       1.0| 0.5110175201989817|
|2020-02-15 18:10:...|@DPMHAUS  It's No...|       1.0| 0.5110175201989817|
|2020-02-15 18:10:...|wants to watch La...|       0.0| 0.4543895280905085|
|20

In [6]:
val windowsPredictionsDF = predictionsDF
.withWatermark("timestamp", "0 seconds")
.groupBy(
  window($"timestamp", "60 seconds", "30 seconds"),
  $"prediction"
).count()


windowsPredictionsDF = [window: struct<start: timestamp, end: timestamp>, prediction: double ... 1 more field]


[window: struct<start: timestamp, end: timestamp>, prediction: double ... 1 more field]

In [7]:
windowsPredictionsDF
    .writeStream
    .outputMode("append")
    .format("console")
    .start()

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@70b7d1b3

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------+-----+
|window|prediction|count|
+------+----------+-----+
+------+----------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+----------+-----+
|window|prediction|count|
+------+----------+-----+
+------+----------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+----------+-----+
|              window|prediction|count|
+--------------------+----------+-----+
|[2020-02-15 18:10...|       0.0|    6|
|[2020-02-15 18:10...|       1.0|   12|
|[2020-02-15 18:10...|       0.0|    6|
|[2020-02-15 18:10...|       1.0|   12|
+--------------------+----------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+----------+-----+
|              window|prediction|count|
+