# Persistencia del estado entre batches - `updateStateByKey`

In [None]:
import org.apache.spark._

In [None]:
import org.apache.spark.streaming._

In [None]:
val streamctx = new StreamingContext(sc, Seconds(30))

In [None]:
val datos = streamctx.socketTextStream("localhost", 19000)

In [None]:
// Obligatorio al trabajar con estados.
streamctx.checkpoint("/eoi/streaming/checkpoint")

In [None]:
datos.print()

In [None]:
case class Medicion(nodo:String, sensor:String, valor:String)

In [None]:
val medidas = datos.flatMap(x => {
    val campos = x.split(";")
    try {
        List(Medicion(campos(0), campos(1), campos(2)))
    }
    catch {
        case e : Throwable => {
            // LLamar al sistema de notificación en caso de fallo en la entrada. 
        }
        List ()
    }
})

In [None]:
// Calcula cuántos datos de cada sensor se recibe en cada batch - Acumulado entre batches.
val aux_sensor = medidas.map(x => (x.sensor, 1)).reduceByKey(_ + _)

In [None]:
val resultado_sensor = aux_sensor.updateStateByKey((entrada, estado:Option[Int]) => {
                                                   if (estado.isEmpty == true) {
                                                       Some(entrada.sum)
                                                   } else {
                                                       Some(entrada.sum + estado.getOrElse(0))
                                                   }})

In [None]:
resultado_sensor.print

In [None]:
// Calcula cuántos datos de cada nodo se recibe en cada batch - Acumulado entre batches.
val aux_nodo = medidas.map(x => (x.nodo, 1)).reduceByKey(_ + _)

In [None]:
val resultado_nodo = aux_nodo.updateStateByKey((entrada, estado:Option[Int]) => {
                                               estado match {
                                                    case None => Some(entrada.sum)
                                                    case Some(acum) => Some(acum + entrada.sum)   
                                               }})

In [None]:
resultado_nodo.print

In [None]:
// Mostrar el nodo que más medidas ha transmitido.
val resultado_max = resultado_nodo.transform(x => x.sortBy(x => x._2, false))

In [None]:
resultado_max.print

In [None]:
streamctx.start()

In [None]:
streamctx.stop()

# Persistencia del estado entre batches - `mapWithState`

In [None]:
import org.apache.spark._

In [None]:
import org.apache.spark.streaming._

In [None]:
val streamctx = new StreamingContext(sc, Seconds(30))

In [None]:
val datos = streamctx.socketTextStream("localhost", 19000)

In [None]:
// Obligatorio al trabajar con estados.
streamctx.checkpoint("/eoi/streaming/checkpoint")

In [None]:
datos.print()

In [None]:
case class Medicion(nodo:String, sensor:String, valor:String)

In [None]:
val medidas = datos.flatMap(x => {
    val campos = x.split(";")
    try {
        List(Medicion(campos(0), campos(1), campos(2)))
    }
    catch {
        case e : Throwable => {
            // LLamar al sistema de notificación en caso de fallo en la entrada. 
        }
        List ()
    }
})

In [None]:
// Calcula cuántos datos de cada sensor se recibe en cada batch - Acumulado entre batches.
val aux_sensor = medidas.map(x => (x.sensor, 1)).reduceByKey(_ + _)

In [None]:
val actualiza_sensor = (sensor:String, value:Option[Int], estado:State[Int]) => {
    // Valor incial: el obtenido del RDD.
    var total = value.getOrElse(0)
    
    // Si existe el estado, se actualiza.
    if (estado.exists == true) {
        total = total + estado.get()
    } 
    // Actualiza el estado.
    estado.update(total)
    // Devuelve el nuevo valor.
    Some((sensor, total))
}

In [None]:
// Acumulado del número de medidas de cada sensor.
val resultado_sensor = aux_sensor.mapWithState(StateSpec.function(actualiza_sensor)).stateSnapshots()

In [None]:
resultado_sensor.print

In [None]:
// Calcula la media de las medidas de temperatura.
val sum_temp = medidas.filter(x => x.sensor == "Temp").map(x => (x.sensor, x.valor.toDouble)).
            transform(x => x.aggregateByKey((0.0, 0.0))((acc, valor) => (acc._1 + valor, acc._2 + 1),
                                                        (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).
                            mapValues(sumCount => 1.0 * sumCount._1 / sumCount._2))

In [None]:
val actualiza_avg_temp = (sensor:String, value:Option[Double], estado:State[Double]) => {
    // Valor incial: el obtenido del RDD.
    var total = value.getOrElse(0.0)
    
    // Si existe el estado, se actualiza.
    if (estado.exists == true && estado.isTimingOut == false) {
        val aux = estado.get()
        if (total < aux) {
            total = aux
        }
    } 
    // Actualiza el estado.
    estado.update(total)
    // Devuelve el nuevo valor.
    Some((sensor, total))
}

In [None]:
// Máxima de la media de temperaturas en cada batch.
val resultado_max_avg = sum_temp.mapWithState(StateSpec.function(actualiza_avg_temp).timeout(Seconds(45))).stateSnapshots()

In [None]:
resultado_max_avg.print

In [None]:
streamctx.start()

In [None]:
streamctx.stop()