# Прием и обработка твитов микробатчем

## Инициализация

In [17]:
import org.apache.spark.sql.types.{StructType, StringType, IntegerType, TimestampType}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.toree.kernel.api
import java.util.Calendar

In [18]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.4


## Чтение модели

In [19]:
val modelPath = "/home/jovyan/models/spark-ml-model"
val model = PipelineModel.load(modelPath)

modelPath = /home/jovyan/models/spark-ml-model
model = pipeline_6cb85880407d


pipeline_6cb85880407d

## Определяем схему

In [21]:
val inputStreamPath = "/home/jovyan/work/events-stream"
//val outputStreamPath = "/home/jovyan/work/events-stream-out"

val dataSchema = new StructType()
    .add("tweet", StringType)
    .add("hiddentargetclue", IntegerType)
    .add("timestamp", TimestampType)

inputStreamPath = /home/jovyan/work/events-stream
getProbability = UserDefinedFunction(<function2>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7, IntegerType)))
dataSchema = StructType(StructField(tweet,StringType,true), StructField(hiddentargetclue,IntegerType,true), StructField(timestamp,TimestampType,true))


StructType(StructField(tweet,StringType,true), StructField(hiddentargetclue,IntegerType,true), StructField(timestamp,TimestampType,true))

## Определяем стриминговый датасет и применяем к нему модель для предсказания

In [52]:
// Определяем udf для предсказания негативного (0) или позитивного (1) твита
val getPrediction01 =
    udf(
        (prediction: org.apache.spark.ml.linalg.Vector) =>
        if ( prediction(0) >= prediction(1) ) 0 else 1 // При равенстве будем склоняться к 0
    )

/*
val inputDF = model.transform(
    spark
    .readStream
    .schema(dataSchema)
    .json(inputStreamPath)
    .withWatermark("timestamp", "60 seconds") // задаем время устаревания данных в потоке пока что, на всякий случай, гораздо больше, чем предположительно нужно
    )
    .select(
        window($"timestamp", "10 seconds", "5 seconds").alias("window"),
        getPrediction01($"probability").alias("prediction01"),
        ($"hiddentargetclue").alias("hiddentargetclue")
    )
    .where($"prediction01"===lit(1))
    .groupBy($"window")
    .agg(count("*").alias("qnty"))
*/

val inputDF =
    model.transform(
        spark
        .readStream
        .schema(dataSchema)
        .json(inputStreamPath)
        .withWatermark("timestamp", "60 seconds") // задаем время устаревания данных в потоке пока что, на всякий случай, гораздо больше, чем предположительно нужно
    )
    .withColumn("prediction01",getPrediction01($"probability"))
    .selectExpr(
        "window(timestamp, '10 seconds', '5 seconds') as window",
        "case when prediction01 = 0 then 1 else 0 end as pred_neg",
        "case when prediction01 = 1 then 1 else 0 end as pred_pos",
        // В этом кейсе можем себе позволить вывести также и реальные метки негатива и позитива для сравнения с предсказанными :)
        "case when hiddentargetclue = 0 then 1 else 0 end as clue_neg",
        "case when hiddentargetclue = 1 then 1 else 0 end as clue_pos"
    )
    .groupBy($"window")
    .agg(
        sum("pred_neg").alias("predicted_neg"),
        sum("pred_pos").alias("predicted_pos"),
        sum("clue_neg").alias("clue_neg"),
        sum("clue_pos").alias("clue_pos")
    )

getPrediction01 = UserDefinedFunction(<function1>,IntegerType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))
inputDF = [window: struct<start: timestamp, end: timestamp>, predicted_neg: bigint ... 3 more fields]


lastException: Throwable = null


[window: struct<start: timestamp, end: timestamp>, predicted_neg: bigint ... 3 more fields]

## Определяем потоковый запрос и стартуем его, обрабатывая батчами

In [55]:
var fRuns = 0
var fTweets:Long = 0

// Микробатч для вывода результата предсказания
// Выводится вероятность негативного твита
// В задании написано, что это последняя колонка, но она здесь вроде первая (в позиции 0)
val stream = inputDF.writeStream.outputMode("complete").foreachBatch {
    (batchDF: DataFrame, batchId: Long) => {
        try {
            fRuns += 1
            batchDF.orderBy(($"window").desc).show(false)
        } catch {
            case e:Throwable => {
                println(e.getMessage)
            }
        }
    }
}.start()

fRuns = 0
fTweets = 0
stream = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@797580f3


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@797580f3

+------------------------------------------+-------------+-------------+--------+--------+
|window                                    |predicted_neg|predicted_pos|clue_neg|clue_pos|
+------------------------------------------+-------------+-------------+--------+--------+
|[2020-01-29 16:51:10, 2020-01-29 16:51:20]|14           |28           |23      |19      |
|[2020-01-29 16:51:05, 2020-01-29 16:51:15]|18           |39           |32      |25      |
|[2020-01-29 16:51:00, 2020-01-29 16:51:10]|8            |22           |17      |13      |
|[2020-01-29 16:50:55, 2020-01-29 16:51:05]|22           |24           |21      |25      |
|[2020-01-29 16:50:50, 2020-01-29 16:51:00]|23           |19           |16      |26      |
|[2020-01-29 16:50:45, 2020-01-29 16:50:55]|18           |24           |16      |26      |
|[2020-01-29 16:50:40, 2020-01-29 16:50:50]|18           |29           |20      |27      |
|[2020-01-29 16:50:35, 2020-01-29 16:50:45]|11           |20           |16      |15      |

## Останов чтения потока

In [56]:
stream.stop()