In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add
from datetime import datetime, timedelta
import pyspark.sql.functions as F

In [2]:
import os
packages = "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1"
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)

In [3]:
def parse_ambiente(line):
    line = line.replace('"', '')
    s = line.split(",")
    try:
        return [{
                #"date": str(s[1]),
                "date": datetime.strptime(s[1], "%Y-%m-%d %H:%M:%S"),
                "Temperature": float(s[2]), #
                "Humidity": float(s[3]), #
                "Light": float(s[4]),
                "CO2": float(s[5]), #
                "HumidityRatio": float(s[6]),
                "Occupancy": int(s[7])
              }
             ]
    except Exception as err:
        print("Wrong line format (%s): " % line)
    return []

In [4]:
def update_from_start(new_values, actual_values): 
    if actual_values is None:
        actual_values = (0, 0)
    new_values = new_values[0]
    new_sum = new_values[0] + actual_values[0]
    new_count = new_values[1] + actual_values[1]
    return new_sum, new_count

In [5]:
def calc_interval(current, preceding):
    print(current)
    current = current[0]
    print(current)
    if preceding is None:
        interval = 0
    else:
        print(preceding)
        interval = (current-preceding).total_seconds()
        print(interval)
    return interval, current

In [6]:
def save_all_avg_per_batch(rdd):
    elems = list(rdd.collect())
    with open('data/output/all-average-batch-log.csv', 'a') as file:
        for record in elems:    
            file.write(str(record) + "\n")

def save_all_avg_cummulated(rdd):
    elems = list(rdd.collect())
    with open('data/output/all-average-cummulated-log.csv', 'a') as file:
        for record in elems:    
            file.write(str(record) + "\n")
            
def save_light_avg_windowed(rdd):
    elems = list(rdd.collect())
    with open('data/output/light-average-windowed-log.csv', 'a') as file:
        for record in elems:    
            file.write(str(record) + "\n")

#### 1. Calcular el promedio de valores de temperatura, humedad relativa y concentración de CO2 para cada micro-batch, y el promedio de dichos valores desde el arranque de la aplicación.

#### a) promedio para cada micro-batch:

In [6]:
sc = SparkContext(appName="PracticaSparkStreaming")
micro_batch_duration = 5
ssc = StreamingContext(sc, micro_batch_duration)
kafkaParams = {"metadata.broker.list": "localhost:9092"}
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)

In [7]:
lines = directKafkaStream.map(lambda o: str(o[1]))
parsed_lines = lines.flatMap(parse_ambiente).map(lambda o: (o["Temperature"], o["Humidity"], o["CO2"]))

Para el cálculo de la media busco calcular la suma y el count por separado y guardar ambos en una tupla que comparte una misma clave por ejemplo "t". Luego hago un join de ambos por esta clave y finalmente la tiro. 

In [8]:
t_sum = parsed_lines.map(lambda x: x[0]).reduce(lambda x, y: (x + y)).map(lambda x: ("t", x))
t_count = parsed_lines.count().map(lambda x: ("t", x))
t_tuple = t_sum.join(t_count)
t_avg = t_tuple.map(lambda x: x[1]).map(lambda x: ("avg temperature", x[0]/x[1]))

h_sum = parsed_lines.map(lambda x: x[1]).reduce(lambda x, y: (x + y)).map(lambda x: ("h", x))
h_count = parsed_lines.count().map(lambda x: ("h", x))
h_tuple = h_sum.join(h_count)
h_avg = h_tuple.map(lambda x: x[1]).map(lambda x: ("avg humidity", x[0]/x[1]))

co2_sum = parsed_lines.map(lambda x: x[2]).reduce(lambda x, y: (x + y)).map(lambda x: ("co2", x))
co2_count = parsed_lines.count().map(lambda x: ("co2", x))
co2_tuple = co2_sum.join(co2_count)
co2_avg = co2_tuple.map(lambda x: x[1]).map(lambda x: ("avg CO2", x[0]/x[1]))

all_avg = t_avg.union(h_avg).union(co2_avg)
all_avg.foreachRDD(save_all_avg_per_batch)


In [9]:
ssc.start()

In [10]:
ssc.stop(False)

#### b) promedio desde el arranque de la aplicación:

In [7]:
sc = SparkContext(appName="PracticaSparkStreaming")
micro_batch_duration = 5
ssc = StreamingContext(sc, micro_batch_duration)
kafkaParams = {"metadata.broker.list": "localhost:9092"}
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)

In [8]:
lines = directKafkaStream.map(lambda o: str(o[1]))
parsed_lines = lines.flatMap(parse_ambiente).map(lambda o: (o["Temperature"], o["Humidity"], o["CO2"]))

Para el calculo del promedio acumulado uso un updateStateByKey, para lo cual también necesito una clace adicional. Luego dentro de la función de update la tiro esa clave. la función de update es sencilla y devuelve el count y la suma acumulados. La división la hago fuera con un map, dado que ahora ambos son parte del mismo DStream. El problema que le veo es que los valores del estado acumulados crecen y no sé si eso puede reducir eficiencia.

In [9]:
t_sum = parsed_lines.map(lambda x: x[0]).reduce(lambda x, y: (x + y)).map(lambda x: ("t", x))
t_count = parsed_lines.count().map(lambda x: ("t", x))
t_tuple = t_sum.join(t_count)

h_sum = parsed_lines.map(lambda x: x[1]).reduce(lambda x, y: (x + y)).map(lambda x: ("h", x))
h_count = parsed_lines.count().map(lambda x: ("h", x))
h_tuple = h_sum.join(h_count)

co2_sum = parsed_lines.map(lambda x: x[2]).reduce(lambda x, y: (x + y)).map(lambda x: ("co2", x))
co2_count = parsed_lines.count().map(lambda x: ("co2", x))
co2_tuple = co2_sum.join(co2_count)

sc.setCheckpointDir("data/checkpoint/")

cummulated = t_tuple.updateStateByKey(update_from_start)
t_avg_actual = cummulated.map(lambda x: x[1]).map(lambda x: ("avg cummulated temperature", x[0]/x[1]))

cummulated = h_tuple.updateStateByKey(update_from_start)
h_avg_actual = cummulated.map(lambda x: x[1]).map(lambda x: ("avg cummulated humidity", x[0]/x[1]))

cummulated = co2_tuple.updateStateByKey(update_from_start)
co2_avg_actual = cummulated.map(lambda x: x[1]).map(lambda x: ("avg cummulated CO2", x[0]/x[1]))

all_avg_actual = t_avg_actual.union(h_avg_actual).union(co2_avg_actual)
all_avg_actual.foreachRDD(save_all_avg_cummulated)


In [10]:
ssc.start()

In [9]:
ssc.stop(False)

#### 2. Calcular el promedio de luminosidad en la estancia en ventanas deslizantes de tamaño 45 segundos, con un valor de deslizamiento de 15 segundos entre ventanas consecutivas.

In [7]:
sc = SparkContext(appName="PracticaSparkStreaming")
micro_batch_duration = 5
ssc = StreamingContext(sc, micro_batch_duration)
kafkaParams = {"metadata.broker.list": "localhost:9092"}
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)

In [8]:
lines = directKafkaStream.map(lambda o: str(o[1]))
parsed_lines = lines.flatMap(parse_ambiente).map(lambda o: o["Light"])

In [9]:
sc.setCheckpointDir("data/checkpoint/")
l_sum = parsed_lines.reduceByWindow(lambda x, y: x+y, lambda x, y: x-y, 45, 15).map(lambda x: ("l", x))
l_count = parsed_lines.countByWindow(45, 15).map(lambda x: ("l", x))
l_tuple = l_sum.join(l_count)
l_avg_window = l_tuple.map(lambda x: x[1])\
                    .map(lambda x: x[0]/x[1])\
                    .map( lambda x: 'avg light windowed %s' % x)
l_avg_window.foreachRDD(save_light_avg_windowed)


In [10]:
ssc.start()

In [11]:
ssc.stop(False)

#### 3. Examinando los datos, podemos apreciar que el intervalo entre muestras originales no es exactamente de 1 minuto en muchos casos. Calcular el número de parejas de muestras consecutivas en cada micro-batch entre las cuales el intervalo de separación no es exactamente de 1 minuto.

[Fallida] Mi idea es usar la función reduce que hace una operación de 2 a 1 de forma escalonada, es decir, el 1 con el 2, luego el resultado con el 3, etc. Pero quizá sea la aproximación erronea. El caso es que da errores de tipo al operar con datetime, y el reduce de un lambda normal me daba error.

Una vez obtenidos los intervalos entre cada muestra, haría un filter.

In [7]:
sc = SparkContext(appName="PracticaSparkStreaming")
micro_batch_duration = 5
ssc = StreamingContext(sc, micro_batch_duration)
kafkaParams = {"metadata.broker.list": "localhost:9092"}
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)

In [8]:
lines = directKafkaStream.map(lambda o: str(o[1]))
parsed_lines = lines.flatMap(parse_ambiente).map(lambda o: o["date"])
parsed_lines.pprint()

In [9]:
sc.setCheckpointDir("data/checkpoint/")
interval = parsed_lines.reduce(calc_interval)
interval.pprint()

In [10]:
# parsed_lines.reduce(lambda x, y: x-y).pprint()

In [11]:
ssc.start()

-------------------------------------------
Time: 2019-07-15 23:22:50
-------------------------------------------
2015-02-05 08:43:00
2015-02-05 08:44:00
2015-02-05 08:44:59
2015-02-05 08:45:59
2015-02-05 08:47:00
2015-02-05 08:48:00

-------------------------------------------
Time: 2019-07-15 23:22:55
-------------------------------------------
2015-02-05 08:49:00
2015-02-05 08:50:00
2015-02-05 08:51:00
2015-02-05 08:51:59
2015-02-05 08:53:00
2015-02-05 08:54:00

-------------------------------------------
Time: 2019-07-15 23:23:00
-------------------------------------------
2015-02-05 08:55:00
2015-02-05 08:55:59
2015-02-05 08:57:00
2015-02-05 08:57:59
2015-02-05 08:58:59
2015-02-05 09:00:00

-------------------------------------------
Time: 2019-07-15 23:23:05
-------------------------------------------
2015-02-05 09:01:00
2015-02-05 09:02:00
2015-02-05 09:03:00
2015-02-05 09:04:00
2015-02-05 09:04:59
2015-02-05 09:06:00

-------------------------------------------
Time: 2019-07-15

In [12]:
ssc.stop(False)