In [13]:
!pip install kafka-python



In [14]:
!curl -sSOL https://dlcdn.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz
!tar -xzf kafka_2.13-3.6.2.tgz

In [15]:
!./kafka_2.13-3.6.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.2/config/zookeeper.properties
!./kafka_2.13-3.6.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.2/config/server.properties
!echo "Esperamos 10 segundos hasta que los servicios kafka y zookeeper estén activos y funcionando"
!sleep 10

Esperamos 10 segundos hasta que los servicios kafka y zookeeper estén activos y funcionando


In [16]:
!ps -ef | grep kafka

root        2670     577  0 15:03 ?        00:00:00 /bin/bash -c ps -ef | grep kafka
root        2672    2670  0 15:03 ?        00:00:00 grep kafka


In [17]:
!./kafka_2.13-3.6.2/bin/kafka-topics.sh --create --topic invernadero --bootstrap-server localhost:9092

Created topic invernadero.


In [18]:
!./kafka_2.13-3.6.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092  --topic invernadero

Topic: invernadero	TopicId: YwvK_ByITvqAM7EfOQEiUw	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: invernadero	Partition: 0	Leader: 0	Replicas: 0	Isr: 0


In [19]:
nombre_topico='invernadero'

In [20]:
!cat producer_invernadero.py

# producer.py
from kafka import KafkaProducer
import time
import json
import pandas as pd
import sys


class Reader:
    def __init__(self, file_name):
        self.file_name = file_name
        self.data = pd.DataFrame()
        self.read_file()
        self.process_data()

    def read_file(self):
        self.data = pd.read_csv(self.file_name,
                                sep=',',
                                header='infer',
                                encoding='iso-8859-1')

    def process_data(self):
        columns_tmp = self.data.columns
        columns_tmp = [column_name.lower() for column_name in columns_tmp]
        self.data.columns = columns_tmp


class Producer:
    def __init__(self, file_name, topic, freq):
        self.topic = topic
        self.freq = freq if isinstance(freq, int) else int(freq)
        self.producer = KafkaProducer(bootstrap_servers='localhost:9092',
                                      value_serializer=lambda x: json.dumps(x).encode('utf-

In [21]:
!nohup python -m producer_invernadero temperatures.csv invernadero 5 &

nohup: appending output to 'nohup.out'


In [22]:
!./kafka_2.13-3.6.2/bin/kafka-console-consumer.sh --topic invernadero --from-beginning --bootstrap-server localhost:9092

{"date": "1986-01-08", "temp": 15.4}
{"date": "1986-01-09", "temp": 11.9}
{"date": "1986-01-10", "temp": 13.8}
{"date": "1986-01-11", "temp": 14.4}
Processed a total of 4 messages


In [25]:
from kafka import KafkaConsumer
import json

class EstrategiaEstadisticos:
    def aplicar_estadistico(self):
        pass

class Media(EstrategiaEstadisticos):
    def aplicar_estadistico(self, datos):
        return sum(datos) / len(datos)

class DesviacionTipica(EstrategiaEstadisticos):
    def aplicar_estadistico(self, datos):
        media = sum(datos) / len(datos)
        return (sum(map(lambda x: (x - media) ** 2, datos)) / (len(datos) - 1)) ** (1 / 2)

class MinMax(EstrategiaEstadisticos):
    def aplicar_estadistico(self, datos):
        return min(datos), max(datos)

class Comprobaciones:
    def __init__(self, sucesor=None):
        self._sucesor = sucesor

    def calcular(self):
        pass

class CalcularEstadisticos(Comprobaciones):
    def __init__(self, estrategias, sucesor=None):
        super().__init__(sucesor)
        self.estrategias = estrategias

    def calcular(self, datos):
        if self._sucesor is not None:
            resultado_sucesor = self._sucesor.calcular(datos)

        resultado_estadisticos = list()

        for e in self.estrategias:
            resultado_estadisticos.append(e.aplicar_estadistico(datos))

        if self._sucesor is not None:
            return resultado_sucesor + resultado_estadisticos

        return resultado_estadisticos

class ComprobarUmbral(Comprobaciones):
    def __init__(self, sucesor, umbral):
        super().__init__(sucesor)
        self.umbral = umbral

    def calcular(self, datos):
        if self._sucesor is not None:
            return self._sucesor.calcular(datos) + [datos[11] > self.umbral]

        return [datos[11] > self.umbral]

class DiferenciaTemperatura(Comprobaciones):
    def calcular(self, datos):
        if self._sucesor is not None:
            return self._sucesor.calcular(datos) + [(datos[11] - datos[5]) > 10]

        return [(datos[11] - datos[5]) > 10]

class Publicador:
    def __init__(self, topic):
        self.subscriptores = list()
        self._consumer = KafkaConsumer(topic,
                                       bootstrap_servers='localhost:9092',
                                       value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                                       group_id='inver')

    def alta(self, subscriptor):
        self.subscriptores.append(subscriptor)

    def baja(self, subscriptor):
        try:
            self.subscriptores.remove(subscriptor)
        except Exception as e:
            print(e)

    def _notificar_subscriptores(self, valor):
        for s in self.subscriptores:
            s.actualizar(valor)

    @property
    def consumer(self):
        return self._consumer

    @consumer.setter
    def consumer(self, value):
        if isinstance(value, KafkaConsumer):
            self._consumer = value

    def iniciar_consumicion(self):
        self.receive_message()

    def receive_message(self):
        message_count = 0
        for message in self._consumer:
            message = message.value
            self._notificar_subscriptores(message["temp"])
            message_count += 1


class Subscriptor:
    def actualizar(self):
        pass

class Sistema(Subscriptor):
    _unicaInstancia = None

    def __init__(self):
        self.datos = list()

        estadisticos = CalcularEstadisticos([Media(), DesviacionTipica(), MinMax()])
        umbral = ComprobarUmbral(estadisticos, 15)

        self.comprobaciones = DiferenciaTemperatura(umbral)

    @classmethod
    def obtener_instancia(cls):
        if cls._unicaInstancia is None:
            cls._unicaInstancia = cls()
        return cls._unicaInstancia

    def actualizar(self, valor):
        if len(self.datos) != 12:
            self.datos.append(valor)
        else:
            self.datos.pop(0)
            self.datos.append(valor)
            resultado = self.comprobaciones.calcular(self.datos)

            print(f"Media: {resultado[0]}; Desviación típica: {resultado[1]} \nMínimo: {resultado[2][0]}; " +
                  f"Máximo: {resultado[2][1]} \nUmbral: {resultado[3]}; Diferencia: {resultado[4]}\n\n")



In [26]:
# Hay que esperar 1 minuto hasta que se llenen los 12 datos y entonces empezará a calcular medias...
consumer = Publicador(nombre_topico)
sistema = Sistema().obtener_instancia()
consumer.alta(sistema)
consumer.iniciar_consumicion()




KeyboardInterrupt: 