#  4. Ejemplo End-to-End Streaming

## 4.1 Instrucciones iniciales y setup

2. Crea un directorio `checkpoint` dentro del subdirectorio `data`.

3. Asegúrate de que tienes permisos suficientes para manipular archivos dentro del directorio (debería ser así ya, si has ejecutado los ejemplos previos). Si fuese necesario, ejecuta `sudo chmod -R 777 data`.

**Entrada: cola de Kafka**

4. Arranca el broker de Kafka, o bien localmente instalado o en una MV local o en un contenedor local (e.g. Docker).

5. Modifica el script de Python `4-kafka_producer.py` para que envíe los datos al broker de Kafka (indicar la IP y puerto correctos).

6. Activa si es necesario el entorno de Anaconda Python (**importante, usando Python v3.6+**). Ejecuta el productor de Kafka con `python p_kafka_producer.py 0.6 1.3 test data/occupancy_data.csv`.

7. A partir de ese momento ya estás listo para ejecutar los *jobs* de Spark Streaming de este notebook. ¡Empecemos con el análisis!

**Consumo de resultados desde Kafka**

Los resultados del Ejercicio 2 y el 3 se escriben en una cola de Kafka de nombre `metrics`. Podemos usar `python 4-kafka_consumer.py` para consumir los datos de dicha cola y presentarlos por pantalla.

**WebUI**: Mientras el contexto de Spark Streaming esté activo, podemos acceder a la interfaz de monitorización de los *jobs* en http://localhost:4040.

## 4.2 Importaciones y creación del contexto

###  Creación del SparkContext (solo la primera vez)

In [1]:
# Importación de dependencias y funciones
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add
from operator import sub

In [2]:
# Load external packages programatically
import os
# THIS IS MANDATORY
# You must provide the information about the Maven artifact for the
# Spark Streaming connector to Kafka
# At present time, only the 0.8.2 version (deprecated) has
# Python support
packages = "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.5"
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)
# THIS IS COMPULSORY
# Comment the line below if JAVA_HOME is already set up or you
# only have a single JVM version in your system
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# OPTIONAL: Check setup of environment variables
print("PYSPARK_SUBMIT_ARGS = ",os.environ["PYSPARK_SUBMIT_ARGS"],"\n")
print("JAVA_HOME = ", os.environ["JAVA_HOME"])

PYSPARK_SUBMIT_ARGS =  --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.5 pyspark-shell 

JAVA_HOME =  /usr/lib/jvm/java-8-openjdk-amd64


In [3]:
sc = SparkContext(appName="KafkaStreamingEndtoEnd")

### Creación del streaming context (en cada ejecución de ejercicio)

In [4]:
# Crear el contexto de Spark Streaming
# Intervalo de actualización de micro-batches (triggers): 5s
ssc = StreamingContext(sc, 5)

## 4.3 Métodos auxiliares

### 4.3.1 Método de parseo de datos de órdenes sobre acciones de bolsa

Este método nos ayuda a parsear cada línea que llega por la cola de Kafka con datos sobre órdenes de compra y venta de acciones en bolsa (formato CSV). Lo utilizamos para acceder a los datos de cada evento (orden) del *stream* de entrada de datos.

In [5]:
from datetime import datetime

def parseOrder(line):
  s = line.split(",")
  try:
      return [{"id": s[0],
               "date": s[1],
               "Temperature": s[2], 
               "Humidity": s[3],
               "Light": s[4],
               "CO2": s[5],
               "HumidityRatio": s[6],
               "Occupancy": s[7]}]

  except Exception as err:
      print("Wrong line format (%s): " % line)
      return []

### 4.3.2 Métodos de escritura - Envío de datos a Kafka

Este método contiene un *productor singleton* (para evitar tener más de un productor enviando datos al broker de Kafka) y un método para serializar los resultados en formato CSV.

In [6]:
# Configura el endpoint para localizar el broker de Kafka
# kafkaBrokerIPPort = "172.20.1.21:9092"
kafkaBrokerIPPort = "127.0.0.1:9092"

# Productor simple (Singleton!)
# from kafka import KafkaProducer
import kafka
class KafkaProducerWrapper(object):
  producer = None
  @staticmethod
  def getProducer(brokerList):
    if KafkaProducerWrapper.producer != None:
      return KafkaProducerWrapper.producer
    else:
      KafkaProducerWrapper.producer = kafka.KafkaProducer(bootstrap_servers=brokerList,
                                                          key_serializer=str.encode,
                                                          value_serializer=str.encode)
      return KafkaProducerWrapper.producer

# Envía métricas a Kafka! (salida)  
def sendMetrics(itr):
  prod = KafkaProducerWrapper.getProducer([kafkaBrokerIPPort])
  for m in itr:
    prod.send("metrics", key=m[0], value=m[0]+","+str(m[1]))
  prod.flush()

## 4.4 Fuente de datos - Lectura

### Monitorización de un directorio

In [7]:
# Fichero de texto: Lectura de fuente de datos de fichero (no se usa en este ejemplo, en su lugar 
# enviamos los datos a Kafka para crear una simulación más realista)
stream = ssc.textFileStream("data/input")

In [8]:
#"id","date","Temperature","Humidity","Light","CO2","HumidityRatio","Occupancy"

### Entrada de datos desde Kafka

In [9]:
# Kafka: Lectura de datos
kafkaParams = {"metadata.broker.list": kafkaBrokerIPPort}
stream = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)
stream = stream.map(lambda o: str(o[1]))

## SOLO PARA SALIDA A ARCHIVO DE TEXTO
## Ejercicio 1: Calcular el promedio de valores de Temperatura, humedad relativa y concentración de CO2 para cada micro-batch y el promedio de dichos valores desde el arranque

In [10]:
valores = stream.flatMap(parseOrder)

In [11]:
valores.pprint()

In [11]:
#Temp_mean = valores.map(lambda o: (o['Temperature'], 1)).reduceByKey(add)
#Temp_mean = (valores.updateStateByKey(lambda vals, 
#                                      totalOpt: sum(vals) + totalOpt if totalOpt != None else sum(vals)))

def mean_score(col):
    return pd.Series([np.mean(col)] * len(col))

#Temp_mean.repartition(1).saveAsTextFiles("data/output/metrics", "csv")

#lines = valores.map(lambda x: x[1])
#counts = lines.map(lambda line: line.split("\t")) \
#              .reduceByKey(lambda a, b: a+b)

Temp_mean.pprint()

In [12]:
ssc.start()

-------------------------------------------
Time: 2020-07-04 13:12:30
-------------------------------------------
{'id': '"94"', 'date': '"2015-02-04 19:24:00"', 'Temperature': '22.2', 'Humidity': '27.2', 'Light': '0', 'CO2': '580', 'HumidityRatio': '0.00450228274276464', 'Occupancy': '0\n'}
{'id': '"95"', 'date': '"2015-02-04 19:25:00"', 'Temperature': '22.1', 'Humidity': '27.15', 'Light': '0', 'CO2': '578', 'HumidityRatio': '0.00446645358141236', 'Occupancy': '0\n'}
{'id': '"96"', 'date': '"2015-02-04 19:25:59"', 'Temperature': '22.1', 'Humidity': '27.2', 'Light': '0', 'CO2': '576.5', 'HumidityRatio': '0.00447473826972372', 'Occupancy': '0\n'}
{'id': '"97"', 'date': '"2015-02-04 19:27:00"', 'Temperature': '22.1', 'Humidity': '27.2', 'Light': '0', 'CO2': '580', 'HumidityRatio': '0.00447473826972372', 'Occupancy': '0\n'}

-------------------------------------------
Time: 2020-07-04 13:12:35
-------------------------------------------
{'id': '"98"', 'date': '"2015-02-04 19:27:59"', 'Tem

-------------------------------------------
Time: 2020-07-04 13:13:10
-------------------------------------------
{'id': '"135"', 'date': '"2015-02-04 20:05:00"', 'Temperature': '21.79', 'Humidity': '27.1', 'Light': '0', 'CO2': '536', 'HumidityRatio': '0.00437403835445405', 'Occupancy': '0\n'}
{'id': '"136"', 'date': '"2015-02-04 20:06:00"', 'Temperature': '21.745', 'Humidity': '27.1', 'Light': '0', 'CO2': '534.5', 'HumidityRatio': '0.00436194296945302', 'Occupancy': '0\n'}
{'id': '"137"', 'date': '"2015-02-04 20:06:59"', 'Temperature': '21.7', 'Humidity': '27.1', 'Light': '0', 'CO2': '532.5', 'HumidityRatio': '0.00434987709774486', 'Occupancy': '0\n'}
{'id': '"138"', 'date': '"2015-02-04 20:08:00"', 'Temperature': '21.76', 'Humidity': '27.1', 'Light': '0', 'CO2': '530', 'HumidityRatio': '0.0043659714821212', 'Occupancy': '0\n'}
{'id': '"139"', 'date': '"2015-02-04 20:09:00"', 'Temperature': '21.745', 'Humidity': '27.05', 'Light': '0', 'CO2': '529.5', 'HumidityRatio': '0.00435383877122

-------------------------------------------
Time: 2020-07-04 13:13:50
-------------------------------------------
{'id': '"176"', 'date': '"2015-02-04 20:45:59"', 'Temperature': '21.6', 'Humidity': '26.5', 'Light': '0', 'CO2': '515', 'HumidityRatio': '0.00422680303527493', 'Occupancy': '0\n'}
{'id': '"177"', 'date': '"2015-02-04 20:47:00"', 'Temperature': '21.65', 'Humidity': '26.55', 'Light': '0', 'CO2': '511', 'HumidityRatio': '0.00424789380271014', 'Occupancy': '0\n'}
{'id': '"178"', 'date': '"2015-02-04 20:48:00"', 'Temperature': '21.6', 'Humidity': '26.5', 'Light': '0', 'CO2': '514', 'HumidityRatio': '0.00422680303527493', 'Occupancy': '0\n'}
{'id': '"179"', 'date': '"2015-02-04 20:49:00"', 'Temperature': '21.6', 'Humidity': '26.445', 'Light': '0', 'CO2': '508', 'HumidityRatio': '0.00421797093546809', 'Occupancy': '0\n'}
{'id': '"180"', 'date': '"2015-02-04 20:50:00"', 'Temperature': '21.6', 'Humidity': '26.4725', 'Light': '0', 'CO2': '509.75', 'HumidityRatio': '0.0042223869542300

In [13]:
# Once you are done, stop the StreamingContext
ssc.stop(False)

In [None]:
def aggregate_tags_count(new_values, total_sum):
        return sum(new_values) + (total_sum or 0)
    
# divide cada Tweet en palabras
words = dataStream.flatMap(lambda line: line.split(" "))
# filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# agrega la cuenta de cada hashtag a su última cuenta
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
wordCounts.pprint()

In [12]:
ssc.start()

-------------------------------------------
Time: 2020-06-24 22:24:35
-------------------------------------------
{'id': '"1781"', 'date': '"2015-02-05 23:31:00"', 'Temperature': '20.39', 'Humidity': '21.29', 'Light': '0', 'CO2': '444.5', 'HumidityRatio': '0.00314694763447223', 'Occupancy': '0\n'}
{'id': '"1782"', 'date': '"2015-02-05 23:31:59"', 'Temperature': '20.3566666666667', 'Humidity': '21.29', 'Light': '0', 'CO2': '441.666666666667', 'HumidityRatio': '0.00314044248260255', 'Occupancy': '0\n'}
{'id': '"1783"', 'date': '"2015-02-05 23:32:59"', 'Temperature': '20.3566666666667', 'Humidity': '21.29', 'Light': '0', 'CO2': '442.333333333333', 'HumidityRatio': '0.00314044248260255', 'Occupancy': '0\n'}
{'id': '"1784"', 'date': '"2015-02-05 23:34:00"', 'Temperature': '20.39', 'Humidity': '21.29', 'Light': '0', 'CO2': '441', 'HumidityRatio': '0.00314694763447223', 'Occupancy': '0\n'}
{'id': '"1785"', 'date': '"2015-02-05 23:35:00"', 'Temperature': '20.29', 'Humidity': '21.29', 'Light': 

In [12]:
ssc.start()

-------------------------------------------
Time: 2020-06-24 22:22:00
-------------------------------------------
"1621","2015-02-05 20:51:00",21,19.7,0,463.5,0.00302284822358619,0

"1622","2015-02-05 20:51:59",21,19.7,0,467.5,0.00302284822358619,0

"1623","2015-02-05 20:53:00",21,19.7,0,476,0.00302284822358619,0

"1624","2015-02-05 20:54:00",21,19.76,0,474,0.00303209974808914,0


-------------------------------------------
Time: 2020-06-24 22:22:05
-------------------------------------------
"1625","2015-02-05 20:55:00",21,19.79,0,471,0.00303672561304647,0

"1626","2015-02-05 20:55:59",21,19.79,0,472,0.00303672561304647,0

"1627","2015-02-05 20:57:00",21,19.79,0,472,0.00303672561304647,0

"1628","2015-02-05 20:57:59",21,19.79,0,474.25,0.00303672561304647,0

"1629","2015-02-05 20:58:59",20.9725,19.79,0,473,0.00303157119548328,0


-------------------------------------------
Time: 2020-06-24 22:22:10
-------------------------------------------
"1630","2015-02-05 21:00:00",21,19.79,0,469,

## SOLO PARA SALIDA HACIA TOPIC DE KAFKA
## Ejercicio 2: Calcula los 5 clientes en el top de ranking (por volumen de órdenes)

In [11]:
orders = stream.flatMap(parseOrder)

numPerType = orders.map(lambda o: (o['buy'], 1)).reduceByKey(add)

amountPerClient = orders.map(lambda o: (o['clientId'], o['amount'] * o['price']))

amountState = (amountPerClient.updateStateByKey(lambda vals, 
                                                totalOpt: sum(vals) + totalOpt if totalOpt != None else sum(vals)))
top5clients = amountState.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False).map(lambda x: x[0]).zipWithIndex().filter(lambda x: x[1] < 5))

buySellList = numPerType.map(lambda t: ("BUYS", [str(t[1])]) if t[0]=="B" else ("SELLS", [str(t[1])]) )
top5clList = top5clients.repartition(1).map(lambda x: str(x[0])).glom().map(lambda arr: ("TOP5CLIENTS", arr))

finalStream = buySellList.union(top5clList)

finalStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendMetrics))

sc.setCheckpointDir("data/checkpoint/")

## Ejercicio 3: Calcular las acciones más solicitadas por duración (Pista: Operación por ventana)

In [12]:
orders = stream.flatMap(parseOrder)

# Cálculo de resumen de compra-venta
numPerType = orders.map(lambda o: (o['buy'], 1)).reduceByKey(add)

# Cálculo de 5 clientes más activo (con memoria)
amountPerClient = orders.map(lambda o: (o['clientId'], o['amount']*o['price']))

amountState = amountPerClient.updateStateByKey(lambda vals, totalOpt: sum(vals)+totalOpt if totalOpt != None else sum(vals))
top5clients = amountState.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False).map(lambda x: x[0])\
                                    .zipWithIndex().filter(lambda x: x[1] < 5))

# Lista de valores más comprados/vendidos (window operation)
buySellList = numPerType.map(lambda t: ("BUYS", [str(t[1])]) if t[0]=="B" else ("SELLS", [str(t[1])]) )
top5clList = top5clients.repartition(1).map(lambda x: str(x[0])).glom().map(lambda arr: ("TOP5CLIENTS", arr))

#stocksWindow = orders.map(lambda x: (x['symbol'], x['amount'])).window(50)
#stocksPerWindow = stocksWindow.reduceByKey(add)
stocksPerWindow = orders.map(lambda x: (x['symbol'], x['amount'])).reduceByKeyAndWindow(add, sub, windowDuration=50)

topStocks = stocksPerWindow.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False).map(lambda x: x[0])\
                                      .zipWithIndex().filter(lambda x: x[1] < 5)).repartition(1)\
                                      .map(lambda x: str(x[0])).glom().map(lambda arr: ("TOP5STOCKS", arr))

finalStream = buySellList.union(top5clList).union(topStocks)

finalStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendMetrics))

sc.setCheckpointDir("data/checkpoint/")


## Start Streaming context

In [15]:
ssc.start()
#kafka.errors.UnrecognizedBrokerVersion: UnrecognizedBrokerVersion
#ssc.awaitTerminationOrTimeout(10)  # Espera 10 segs. antes de acabar

## Stop Streaming Context

In [16]:
ssc.stop(False)

In [None]:
from datetime import datetime

def parseOrder(line):
  s = line.split(",")
  try:
      return [{"id": s[0],
               "date": datetime.strptime(s[1], "%Y-%m-%d %H:%M:%S"),
               "Temperature": float(s[2]), 
               "Humidity": float(s[3]),
               "Light": s[4],
               "CO2": float(s[5]),
               "HumidityRatio": float(s[6]),
               "Occupancy": s[7]}]

  except Exception as err:
      print("Wrong line format (%s): " % line)
      return []