# Spark Streaming
---

## Flujos de datos

* Es continuo, la frecuencia de la llegada de los datos depende del problema
* Datos son recolectados en tiempo real
* No se almacenan para entrenar el modelo

## Fuentes

* RRSS
* Transacciones bancarias o criptomonedas
* Monitoreos de redes, sensores
* Análisis climático
* Etc.

## Estrategias para el tratamiento del flujo

* El dato se recibe, se utiliza y se descarta
* Ventana temporal para guardar los últimos n datos recibidos

## Spark streaming

* Por cuestiones de eficiencia y compatibilidad, Spark streaming guarda el stream en pequeños "chunks" ejecutando procesos batch (micro-batch)
* input data stream => **Spark Streaming** => batches of input data => **Spark Engine** => batches of processed data
* Un stream es representado como un stream discreto (DStream) el cual es una secuencia de RDDs
* Cada RDD es un snapshot de todos los datos recolectados durante un período de tiempo, el cual luego se procesa como un batch

In [None]:
import os
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/openjdk-17.jdk/Contents/Home'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession.builder.appName("Ejemplo1").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

# Split the lines into words
# Explode desagrega. ej: pp ; [a, b, c]  => pp;a  pp;b  pp;c
words = lines.select(explode(split(lines.value, " ")).alias("word"))

# Generate running word count
wordCounts = words.groupBy("word").count()

# Imprime en pantalla el procesamiento
query = wordCounts.writeStream.outputMode("complete").format("console").option("numRows", "1000").start()

query.awaitTermination()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession.builder.appName("Ejemplo1").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

# Split the lines into words
temp = lines.select(explode(split(lines.value, " ")).alias("temp"))

# Generate running word count
temperaturas = temp.filter("temp > 25").groupBy("temp").count()

query = temperaturas.writeStream.outputMode("complete").format("console").start()

query.awaitTermination()

In [None]:
import socket
import time
import random

s = socket.socket()

port = 9999

s.bind(('127.0.0.1', port))

s.listen(5)
i = 0
while True:
    c, addr = s.accept()
    while True:    
        c.send((str(random.randint(0, 40)) + "\r").encode())
        time.sleep(1)
        print("pasa")
    c.close()