In [None]:
val source = spark.readStream
    .format("socket")
    .option("host", "127.0.0.1")
    .option("port", 9876)
    .load()

In [None]:
// read more at this url
// https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html#complex-nested-data-notebook

import org.apache.spark.sql.types._                         // include the Spark Types to define our schema
import org.apache.spark.sql.functions._                     // include the Spark helper functions

val jsonSchema = new StructType()
        .add("date", StringType)
        .add("gender", StringType)
        .add("icd10",StringType)
        .add("patient_type", StringType)
        .add("subdistrict", StringType)

In [None]:
val visits = source.select(from_json($"value", jsonSchema) as "data")
    .select($"data.*")

In [None]:
import org.apache.spark.sql.functions._ // for `when`and `udf`

val ili = udf((s:String) => {
    if (s == "A01.1" || s == "C01.2") 1 else 0
})

val cases = visits.withColumn("flu", when($"icd10" === "C01.2", 1).otherwise(0))
    .withColumn("SARI", when($"icd10" === "B01.3", 1).otherwise(0))
    .withColumn("pneumonia", when($"icd10" === "A01.1", 1).otherwise(0))
    .withColumn("ILI", ili($"icd10"))
    .withColumn("IPD", when($"patient_type" === "IPD", 1).otherwise(0))
    .withColumn("OPD", when($"patient_type" === "OPD", 1).otherwise(0))
    .select($"date", $"subdistrict", $"flu", $"SARI", $"pneumonia", $"ILI", $"IPD", $"OPD")


val summary = cases.groupBy($"subdistrict")
    .agg(sum($"flu") as "flu"
         ,sum($"ILI") as "ILI"
         ,sum($"SARI") as "SARI"
         ,sum($"pneumonia") as "pneumonia"
         ,sum($"IPD") as "IPD"
         ,sum($"OPD") as "OPD"
        )
.orderBy(asc("subdistrict"))

In [None]:

class JDBCSink(url: String, user:String, pwd:String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]{
    val driver = "org.postgresql.Driver"
    var connection:java.sql.Connection = _
    var statement:java.sql.Statement = _

    def open(partitionId: Long, version: Long):Boolean = {
        Class.forName(driver)
        connection = java.sql.DriverManager.getConnection(url, user, pwd)
        statement = connection.createStatement
        true
    }

    def process(value: org.apache.spark.sql.Row): Unit = {        
    statement.executeUpdate("INSERT INTO public.test(subdistrict, flu, ili, sari, pneumonia, ipd, opd) " +
                             "VALUES ('" + value(0) + "'," 
                            + value(1) + "," 
                            + value(2) + "," 
                            + value(3) + "," 
                            + value(4) + "," 
                            + value(5) + "," 
                            + value(6)+ ")" 
                            + "on conflict (subdistrict) do "
                            + "update " 
                            + "set flu = EXCLUDED.flu" 
                            + " ,ili = EXCLUDED.ili"
                            + " ,sari = EXCLUDED.sari"
                            + " ,pneumonia = EXCLUDED.pneumonia"
                            + " ,ipd = EXCLUDED.ipd"
                            + " ,opd = EXCLUDED.opd"
                            +" ;")
    }

    def close(errorOrNull:Throwable):Unit = {
        connection.close
    }
}

In [None]:

val writer = new JDBCSink("jdbc:postgresql://127.0.0.1:5432/sss", "pphetra", "pphetra")


In [None]:

import org.apache.spark.sql.streaming.Trigger
val query = summary.writeStream
    .foreach(writer)
    .outputMode("complete")
    .trigger(Trigger.ProcessingTime("5 seconds"))
    .start()

In [None]:
query.stop()