# Practica de Sistemas Distribuidos de Procesamiento de Datos III

## Javier Pérez Clemente

### Introducción

Comenzamos importando las librerías de pyspark y kafka necesarias para el desarrollo de la práctica.

In [1]:
from datetime import datetime, timedelta
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import kafka

In [2]:
# Load external packages programatically
import os
packages = "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0"

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)

Levantanmos el contexto de Spark y el contexto de Streaming con una longitud de micro batches de 5 segundos.

In [3]:
sc = SparkContext(appName="PracticaPythonStreamingKafka")

In [4]:
ssc = StreamingContext(sc, 5)

Escritura de datos en una cola kafka de topic "results".

In [5]:
kafkaBrokerIPPort = "127.0.0.1:9092"
kafkaProducerTopic = "test"
kafkaConsumerTopic = "results"

class KafkaProducerWrapper(object):
    producer = None
    @staticmethod
    def getProducer(brokerList):
        if KafkaProducerWrapper.producer != None:
            return KafkaProducerWrapper.producer
        else: 
            KafkaProducerWrapper.producer = kafka.KafkaProducer(bootstrap_servers=brokerList,
                                                              key_serializer=str.encode,
                                                              value_serializer=str.encode)
        return KafkaProducerWrapper.producer

# Envía métricas a Kafka (salida)
def sendResults(itr):
    prod = KafkaProducerWrapper.getProducer([kafkaBrokerIPPort])
    for m in itr:
        prod.send(kafkaConsumerTopic, key=m[0], value=m[0]+": "+str(m[1]))
    prod.flush()

Lectura de datos de una cola kafka de topic "test"

In [6]:
kafkaParams = {"metadata.broker.list": kafkaBrokerIPPort}
directKafkaStream = KafkaUtils.createDirectStream(ssc, [kafkaProducerTopic], kafkaParams) #Conexion de kafka con spark
ssc.checkpoint("checkpoint")  # create dir for checkpoint archives

Py4JJavaError: An error occurred while calling o27.createDirectStreamWithoutMessageHandler.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:387)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:387)
	at scala.util.Either.fold(Either.scala:98)
	at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:386)
	at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:223)
	at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:721)
	at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:689)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


Preparación del DStream de entrada para cada uno de los ejercicios.

In [7]:
lines = directKafkaStream.map(lambda x: x[1])\
                         .map(lambda x: str.replace(x, '"', ''))\
                         .map(lambda x: x.split(","))

## Ejercicio 1

Calcular el promedio de valores de temperatura, humedad relativa y concentración de CO2 para cada micro-batch, y el promedio de dichos valores desde el arranque de la aplicación.

Definimos las funciones que evamos a necesitar para transformar los datos

In [8]:
# Calcula la suma y el número de muestras para actualización de estado
def sum_since_start(new, current):
    new = new[0]
    current = current or (0, 0)
    new_count = new[1] + current[1]
    new_sum = new[0] + current[0]
    return new_sum, new_count

# Calcula la suma y el número de muestras para micro batch
def sum_by_batch(current, new):
    new_count = new[1] + current[1]
    new_avg = new[0] + current[0]
    return (new_avg, new_count)

# Obtiene el promedio a partir de la suma y el número de muestras.
def get_average(tuple):
    key = tuple[0]
    sum = tuple[1][0]
    count = tuple[1][1]
    return key, sum/count

In [9]:
variables = lines.map(lambda x: [('Average temperature', (float(x[2]), 1)), ('Average humidity ratio', (float(x[3]), 1)), ('Average CO2 concentration', (float(x[5]), 1))])\
                 .flatMap(lambda v: (v[0], v[1], v[2]))

values_since_start = variables.reduceByKey(lambda x, y: tuple(map(lambda l, m: l + m, x, y)))\
                              .updateStateByKey(sum_since_start)\
                              .map(lambda x: (x[0]+' since start', x[1]))\
                              .map(get_average)

values_by_batch = variables.reduceByKey(sum_by_batch)\
                           .map(lambda x: (x[0]+' by batch', x[1]))\
                           .map(get_average)

## Ejercicio 2 

Calcular el promedio de luminosidad en la estancia en ventanas deslizantes de tamaño 45 segundos, con un valor de deslizamiento de 15 segundos entre ventanas consecutivas

In [10]:
# Calcula la resta y el número de muestras para micro batch (función inversa de sum_by_batch())
def substr_by_batch(current, old):
    new_count = current[1] - old[1]
    new_avg = current[0] - old[0]
    return new_avg, new_count

In [11]:
light_over_window = lines.map(lambda x: ('Average luminosity by batch', (float(x[4]), 1)))\
                       .reduceByKeyAndWindow(sum_by_batch, substr_by_batch, 45, 15).map(get_average)

## Ejercicio 3

Examinando los datos, podemos apreciar que el intervalo entre muestras originales no es exactamente de 1 minuto en muchos casos. Calcular el número de parejas de muestras consecutivas en cada micro-batch entre las cuales el intervalo de separación no es exactamente de 1 minuto.

In [12]:
# Obtiene el número de parejas de muestras cuya separación temporal es distinta de 60 segundos.
def count_couples(current, new):
    FMT = '%Y-%m-%d %H:%M:%S'
    tdelta = datetime.strptime(new[0], FMT) - datetime.strptime(current[0], FMT)
    new_count = current[1] + (1 if tdelta != timedelta(minutes=1) else 0)
    new_date = new[0]
    return (new_date, new_count)

In [13]:
tim_sep = lines.map(lambda x: ('number of sample couples difference != 1 min', (x[1], 0)))\
               .reduceByKey(count_couples)\
               .map(lambda x: (x[0], x[1][1]))

## Resultados

Se combinan los resultamos de cada ejercicio y se envian a la cola kafka de resultados.

In [14]:
finalStream = values_by_batch.union(values_since_start).union(tim_sep)

light_over_window.foreachRDD(lambda rdd: rdd.foreachPartition(sendResults))
finalStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendResults))

ssc.start()
# ssc.awaitTermination()

Conectamos un consuidor de kafka para imprimir por consola los mensajes que van llegando a la kola de resultados.

In [None]:
print("Initialization...")

consumer = kafka.KafkaConsumer(bootstrap_servers=kafkaBrokerIPPort,
                         auto_offset_reset='earliest')
consumer.subscribe([kafkaConsumerTopic])

for message in consumer:
    print (message.value)

print("End")

Initialization...
b'Average temperature by batch: 23.67954545454546'
b'Average humidity ratio by batch: 26.814318181818184'
b'Average CO2 concentration by batch: 864.3530303030301'
b'Average temperature since start: 23.67954545454546'
b'sample couples difference != 1 min: 3'
b'Average CO2 concentration since start: 864.3530303030301'
b'Average humidity ratio since start: 26.814318181818184'
b'Average CO2 concentration by batch: 936.6666666666666'
b'Average humidity ratio by batch: 27.291416666666663'
b'Average temperature by batch: 23.604166666666668'
b'Average CO2 concentration since start: 889.8754901960783'
b'Average humidity ratio since start: 26.982705882352942'
b'Average temperature since start: 23.65294117647059'
b'sample couples difference != 1 min: 2'
b'Average luminosity by batch: 488.8175324675325'
b'Average CO2 concentration by batch: 968.9223809523808'
b'Average temperature by batch: 23.6'
b'Average humidity ratio by batch: 27.4854'
b'Average CO2 concentration since start:

b'Average temperature by batch: 20.6'
b'Average CO2 concentration since start: 535.66139657444'
b'Average humidity ratio since start: 20.590902503293805'
b'Average temperature since start: 21.10498353096177'
b'number of sample couples difference != 1 min: 1'
b'Average CO2 concentration by batch: 451.1666666666667'
b'Average humidity ratio by batch: 20.881666666666668'
b'Average temperature by batch: 20.583333333333332'
b'Average CO2 concentration since start: 533.7039897039896'
b'Average humidity ratio since start: 20.597638352638352'
b'number of sample couples difference != 1 min: 3'
b'Average temperature since start: 21.092898970398952'
b'Average CO2 concentration by batch: 456.875'
b'Average humidity ratio by batch: 20.938125'
b'Average temperature by batch: 20.54375'
b'Average CO2 concentration since start: 532.5354879594422'
b'Average humidity ratio since start: 20.60281685678073'
b'number of sample couples difference != 1 min: 2'
b'Average temperature since start: 21.084546894803

In [None]:
ssc.stop(False)