In [None]:
!pip install kafka

In [2]:
import json
import string
import operator
import numpy as np
import time
import datetime

from kafka import KafkaConsumer
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


# Importing Data for Real Time plot
from bokeh.models.sources import ColumnDataSource
from bokeh.models import DatetimeTickFormatter
from bokeh.plotting import figure
from bokeh.io import output_notebook, show, push_notebook

from RadiatusConstants import *

### Mostramos la lista de topics en Kafka

Usamos [Kafka Python](https://github.com/dpkp/kafka-python)

In [3]:
consumer = KafkaConsumer('sensor', bootstrap_servers = kafkaBrokerList)
print("Brokers: " + kafkaBrokerList)
print("Topics: " + str(consumer.topics()))

Brokers: 127.0.3.149:9092,127.0.3.150:9092,127.0.3.151:9092
Topics: {'sensor'}


In [3]:
print(kafkaBrokerList)

127.0.3.149:9092,127.0.3.150:9092,127.0.3.151:9092


### Creamos contexto de Spark Streaming y comenzamos a procesar los mensajes

Hacemos un conteo de palabras y obtenemos las 10 palabras que más veces han aparecido ordenadas

In [4]:
def parserKafkaMessage(message):
    mess=json.loads(message[1])
    return [("Temperature", mess['Temperature']), ("Humidity", mess['Humidity']) , ("Pressure", mess['Pressure'])]

def prossecingElements(sensorValues):
    meanSensorValues=sum(sensorValues[1]) / float(len(sensorValues[1]))
    return [(sensorValues[0], meanSensorValues)]

def printRDD(rdd):
    global x 
    global new_data
    global temp
    global temperatureData 
    
    n_show = 10
    
    sensorValues = rdd.collect()
    for w in sensorValues:
        if (w[0]=="Temperature"):
            #Updating plot
            temp.append(w[1])
            x.append(datetime.datetime.now())    
    
    new_data['x'] = x = x[-n_show:]  # prevent filling ram
    new_data['temp'] = y = temp[-n_show:]  # prevent filling ram
    temperatureData.stream(new_data, n_show)
    push_notebook(handle=handle)
    

In [5]:
from bokeh.models import HoverTool
global x 
global new_data
global temp
global temperatureData 

output_notebook()

plot = figure(plot_width=800, plot_height=400, x_axis_type='datetime')
plot.xaxis.axis_label = "Time"
plot.yaxis.axis_label = "Temperature"

plot.xaxis.formatter = DatetimeTickFormatter(seconds=["%T"], minutes=["%T"],hours=["%T"])

temperatureData = ColumnDataSource(data=dict(x=[0], temp=[0]))
line = plot.line("x", "temp", source=temperatureData)
handle = show(plot, notebook_handle=True)

# Formatting Data

new_data=dict(x=[0], temp=[0])
x = []
temp = []

period = 1  # in seconds (simulate waiting for new data)
n_show = 10  # number of points to keep and show



In [None]:
# Create Spark Streaming Context with an interval of 10 seconds

ssc = StreamingContext(sc, 2)

# Connect with Kafka
kvs = KafkaUtils.createDirectStream(ssc, ['sensor'], {'bootstrap.servers': kafkaBrokerList})
#data = kvs.map(parserKafkaMessage)

#data.pprint()
avrByKey= kvs.flatMap(parserKafkaMessage) \
             .groupByKey()                \
             .flatMap(prossecingElements).foreachRDD(printRDD) 


# Start Spark Streaming process and wait termination
ssc.start()
ssc.awaitTermination()