In [1]:
import org.apache.spark.sql.types.{StructType, StringType,StructField, IntegerType, LongType}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame


In [2]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.4


In [3]:
val inputStreamPath = "/home/jovyan/data/events-stream"
val modelPath = "/home/jovyan/models/spark-ml-model"
val model = PipelineModel.load(modelPath)

val dataSchema = new StructType()
    .add("tweet", StringType)
val inputDF = spark
    .readStream
    .schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .json(inputStreamPath)

inputStreamPath = /home/jovyan/data/events-stream
modelPath = /home/jovyan/models/spark-ml-model
model = pipeline_0712a1d840e0
dataSchema = StructType(StructField(tweet,StringType,true))
inputDF = [tweet: string]


[tweet: string]

In [4]:
import java.time.Instant
val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))
val getTimestamp = udf(() => Instant.now.getEpochSecond )
val dataSchema = new StructType()
    .add("tweet", StringType)
    .add("prediction", IntegerType)
    .add("clean_probability", LongType)
    .add("TIMESTAMP", LongType)

var oldDF = Seq.empty[(String, Int, Long, Long)].toDF("tweet", "prediction", "clean_probability", "TIMESTAMP")

inputDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    
    val predictionsDF = model.transform(batchDF)
    val TenMins = getTimestamp() - 10
    val DF = predictionsDF.select($"tweet", $"prediction", getProbability($"probability").alias("clean_probability"), getTimestamp().alias("TIMESTAMP"))
        
    oldDF = oldDF.union(DF)
    DF.show(100)
    println(oldDF.count())  
    oldDF.filter($"TIMESTAMP" >= TenMins).groupBy("prediction").count().show()
}.start()



+--------------------+----------+-------------------+----------+
|               tweet|prediction|  clean_probability| TIMESTAMP|
+--------------------+----------+-------------------+----------+
|Revising latin ve...|       1.0| 0.5063874178122819|1583408179|
|My Dad had a stro...|       1.0| 0.5272337637446153|1583408179|
|@FSOnline Do you ...|       0.0|0.44711200702403436|1583408179|
|Golly it's a prop...|       0.0|0.45750550378135324|1583408179|
|oy NK DRAMA KIDS!...|       0.0| 0.4720856221457628|1583408179|
|       At workkk... |       1.0| 0.5335873473819716|1583408179|
|callin a cab. wor...|       1.0| 0.5335873473819716|1583408179|
|BNP get an MEP? I...|       1.0| 0.5335873473819716|1583408179|
|@Bionic1 we need ...|       1.0| 0.5335873473819716|1583408179|
|@billbeckett i lo...|       0.0| 0.4045377706885748|1583408179|
|feeling very happ...|       0.0|  0.499594950360727|1583408179|
|@Pippi43 Hey!  Gl...|       1.0| 0.5615444265398342|1583408179|
|listening to rent...|   

getProbability = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))
getTimestamp = UserDefinedFunction(<function0>,LongType,Some(List()))
dataSchema = StructType(StructField(tweet,StringType,true), StructField(prediction,IntegerType,true), StructField(clean_probability,LongType,true), StructField(TIMESTAMP,LongType,true))
oldDF = [tweet: string, prediction: int ... 2 more fields]


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4107d264

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|    6|
|       1.0|   13|
+----------+-----+

+--------------------+----------+-------------------+----------+
|               tweet|prediction|  clean_probability| TIMESTAMP|
+--------------------+----------+-------------------+----------+
|@NotoriousTIMP bl...|       0.0| 0.4750251093358478|1583408183|
|@KILA21 ok I saww...|       0.0| 0.4348257203107105|1583408183|
|Bloody bollocking...|       1.0| 0.5287849050889146|1583408183|
|@maynaseric Ya, I...|       0.0|0.45115344186973616|1583408183|
|Life is so differ...|       0.0|  0.389210498496237|1583408183|
|@Anthonyyyy i can...|       0.0|0.45825571625270933|1583408183|
|ohh its so clod a...|       0.0|   0.43778405489319|1583408183|
|@al_ice I was act...|       0.0| 0.4469177594859313|1583408183|
|study study study...|       1.0| 0.5335873473819716|1583408183|
|Finally got her r...|       1.0| 0.5335873473819716|1583408183|
|@Mr_PaulEvans  ba...|       1.0| 0.5188

124
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|   41|
|       1.0|   83|
+----------+-----+

+--------------------+----------+-------------------+----------+
|               tweet|prediction|  clean_probability| TIMESTAMP|
+--------------------+----------+-------------------+----------+
|@schaeferj89 Its ...|       1.0| 0.5335873473819716|1583408198|
|@AubreyODay  Aubr...|       0.0| 0.4397077249872849|1583408198|
|leaving sunny Sco...|       1.0| 0.5335873473819716|1583408198|
|@lil_feisty i don...|       0.0|0.45678593084795843|1583408198|
|Lied about the re...|       1.0| 0.5335873473819716|1583408198|
|Theres something ...|       1.0| 0.5287849050889146|1583408198|
|@misskiki904 I KNOW |       0.0|0.45825571625270933|1583408198|
|@phanxhunter oh i...|       0.0|0.48253309209206263|1583408198|
|@laceybenz Sorry ...|       1.0| 0.5168118490760507|1583408198|
|Wake up call at 7...|       1.0| 0.5335873473819716|1583408198|
|@KellyG5 thanks  ...|       1.0| 0.

221


In [5]:
spark.stop()


lastException: Throwable = null
lastException: Throwable = null
error:
  unrecoverable error
     while compiling: <console>
        during phase: erasure
     library version: version 2.11.12
    compiler version: version 2.11.12
  reconstructed args: -classpath file:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:file:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/localedata.jar:file:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:file:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/sunec.jar:file:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:file:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/nashorn.jar:file:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/dnsns.jar:file:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/cldrdata.jar:file:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/jaccess.jar:file:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/zipfs.jar:file:/usr/local/spark-2.4.4-bin-hadoop2.7/conf/:file:/usr/local/spark-2.4.4-bin-hadoop2.7/ja