In [None]:
#Instalamos la librería de kafka-python
!pip install kafka-python
#Descargamos y arrancamos Apache Kafka
!curl -sSOL https://dlcdn.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz
!tar -xzf kafka_2.13-3.6.2.tgz
!./kafka_2.13-3.6.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.2/config/zookeeper.properties
!./kafka_2.13-3.6.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.2/config/server.properties
!echo "Esperamos 10 segundos hasta que los servicios kafka y zookeeper están activos y funcionando"
!sleep 10
!./kafka_2.13-3.6.2/bin/kafka-topics.sh --create --topic temperaturas --bootstrap-server localhost:9092

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/246.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m71.7/246.5 kB[0m [31m1.9 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2
Esperamos 10 segundos hasta que los servicios kafka y zookeeper están activos y funcionando
Created topic temperaturas.


In [None]:
from kafka import KafkaProducer, KafkaConsumer
import time
import json
import random
import time
import pandas as pd

In [None]:
from abc import ABC, abstractmethod
import numpy as np
from functools import reduce
import time

"""
Versión adaptada con funcionalidades para Apache Kafka.
Esencialmente, es idéntico al código proporcionado, así que no lleva apenas documentación.
Evidentemente, se han añadido nuevos métodos en SistemaGestor para poder leer y extraer
los datos del tópico (que se pasa como parámetro a la hora de instanciar):
  start_reading(): llama al método receive_data_and_process

  receive_data_and_process(): accede al tópico, extrae y almacena los datos para
                              realizar el procesado.
"""


#R1. Singleton
class SistemaGestor:

    _unicaInstancia = None

    def __init__(self, topic):
        self.datos = [] #Lista de datos, que en principio almacenaremos 12
        self.estadisticos = {} #Diccionario de estadísticos
        self.umbral = np.inf
        self.supera_umbral = False
        self.sobrecrecimiento = False
        self.ops = ["estadisticos", "umbral", "sobrecrecimiento"]

        self._consumer = KafkaConsumer(topic,
                                       bootstrap_servers='localhost:9092',
                                       value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                                       group_id='temperaturas')
    @property
    def consumer(self):
      return self._consumer

    @consumer.setter
    def consumer(self, value):
      if isinstance(value, KafkaConsumer):
        self._consumer = value

    def start_reading(self):
      """
      llama al método receive_data_and_process. Para nada necesario este método...
      """
      self.receive_data_and_process()

    def receive_data_and_process(self):
        """
        accede al tópico, extrae y almacena los datos para realizar el procesado.
        """
        for message in self._consumer:
            self.actualizar(message.value)
            self.procesar()
            print(self.estadisticos)


    @classmethod
    def get_instance(cls, topic):
        if cls._unicaInstancia == None:
            cls._unicaInstancia = cls(topic)
        return cls._unicaInstancia

    def set_umbral(self, umbral):
        self.umbral = umbral

    def actualizar(self, dato):
        if len(self.datos) == 12:
            i = 1
            while i < 11:
                aux = self.datos[i]
                self.datos[i-1] = aux
                i = i + 1
            self.datos[-1] = dato

        else:
            self.datos.append(dato)


    def procesar(self):

        op1 = Sobrecrecimiento()
        op2 = Umbral(successor=op1)
        op3 = Estadistico(successor=op2)

        for op in self.ops:
            if op == "estadisticos":
                e1 = MeanSV()
                e2 = MaxMin()
                e3 = Quantile()
                estrategias = [e1, e2, e3]
                for e in estrategias:
                    op3.set_estrategia(e)
                    op3.realizar_operacion(op, self.estadisticos, list(zip(*self.datos))[1])

            else:
                op2.realizar_operacion(gestor= self, op = op, l = self.datos, umbral=self.umbral)


#R2. Observer
class Observable:
    def __init__(self):
        self.cliente = None

    def activar(self, observer):
        self.cliente = observer

    def desactivar(self):
        self.cliente = None

    def notificar(self, dato):
        self.cliente.actualizar(dato)

class Sensor(Observable):
    def __init__(self):
        self.dato = None

    def enviar_dato(self, dato):
        self.dato = dato
        self.notificar(self.dato)

#R3. Chain of responsibilities
class Manejador(ABC):
    def __init__(self, successor = None):
        self.successor = successor

    def realizar_operacion(self):
        pass

    def set_manejador(self):
        pass


#R4. Strategy (dentro del R3)
class Strategy:
    def estrategia(self):
        pass

class MeanSV(Strategy):
    def estrategia(self, d, l):
        n = len(l)
        mean = round(reduce(lambda x, y: x+y, l) / n, 4)
        sv = round(np.sqrt(sum(map(lambda x: (x-mean)**2, l))/(n-1)), 4) if(n-1)!=0 else 0

        d["media"] = mean
        d["Desviacion Tipica"] = sv

class MaxMin(Strategy):
    def estrategia(self, d, l):
        maximo = reduce(lambda x,y: x if x>y else y, l)
        minimo = reduce(lambda x,y: x if x<y else y, l)

        d["max"] = maximo
        d["min"] = minimo

class Quantile(Strategy):
    def estrategia(self, d, l):
        n = len(l)
        l_ordenado = sorted(l)
        median = round(list(map(lambda x: x[(n+1)//2 - 1] if n%2 == 1 else ((x[n//2 - 1] + x[(n//2 - 1)+1])/2), [l_ordenado]))[0], 4)
        q25 = round(list(map(lambda x: x[(n+1)//4 - 1], [l_ordenado]))[0], 4)
        q75 = round(list(map(lambda x: x[(3*(n+1))//4 - 1], [l_ordenado]))[0], 4)

        d["Q1"] = q25
        d["mediana"] = median
        d["Q3"] = q75

class Estadistico(Manejador):
    def __init__(self, successor = None):
        Manejador.__init__(self, successor = successor)
        self.estrategia = None

    def set_estrategia(self, estrategia:Strategy):
        self.estrategia = estrategia

    def realizar_operacion(self, op, d, l):
        if op == "estadisticos":
            self.estrategia.estrategia(d,l)

        elif self.successor:
            self.successor.realizar_operacion(op)

    def set_manejador(self, nuevo_manejador):
        self.successor = nuevo_manejador


class Umbral(Manejador):
    def realizar_operacion(self, **kwargs):
        op = kwargs["op"]

        if op == "umbral":
            umbral = kwargs["umbral"]
            l = kwargs["l"]
            gestor = kwargs["gestor"]

            overheat = list(filter(lambda x: x[1] > umbral, l))

            if len(overheat) == 0:
                gestor.supera_umbral = False
                #return False

            else:
                gestor.supera_umbral = overheat
                #return overheat

        elif self.successor:
            self.successor.realizar_operacion(**kwargs)

    def set_manejador(self, nuevo_manejador):
        self.successor = nuevo_manejador

class Sobrecrecimiento(Manejador):
    def realizar_operacion(self, **kwargs):
        op = kwargs["op"]

        if op == "sobrecrecimiento":
            gestor = kwargs["gestor"]
            l = kwargs["l"]

            if len(l) <= 1:
                gestor.sobrecrecimiento = False
                return 0 #para terminar el procesamiento
                #return False

            elif len(l) <= 6:
                l30s = l

            else:
                l30s = l[-6:]  #last 30 sec

            #Vamos a hacerlo por doble puntero xd
            i = 0
            j = 1

            while i < len(l30s):
                while j < len(l30s):
                    if l30s[j][1] - l30s[i][1] >= 10:

                        gestor.sobrecrecimiento = [l30s[i], l30s[j]] #Devuelve la primera pareja de temperaturas cuya diferencia supera los 10º
                        return 0

                    j = j + 1

                i = i + 1
                j = i + 1

            gestor.sobrecrecimiento = False

        elif self.successor:
            self.successor.realizar_operacion(**kwargs)

    def set_manejador(self, nuevo_manejador):
        self.successor = nuevo_manejador


# DESCARGAR EL FICHERO "kafkaProducer.py" Y SUBIRLO AQUÍ ANTES DE CONTINUAR!

Ejecutar el Producer en segundo plano:

In [None]:
!nohup python -m kafkaProducer temperaturas 5 &

nohup: appending output to 'nohup.out'


Para ver los mensajes que se van mandando al tópico:

In [None]:
!./kafka_2.13-3.6.2/bin/kafka-console-consumer.sh --topic temperaturas --from-beginning --bootstrap-server localhost:9092

[1715241377.8100538, 29.5065]
[1715241377.9282224, 15.1748]
[1715241382.9334152, 24.1934]
[1715241387.9394464, 16.3888]
Processed a total of 4 messages


Instanciar el gestor (consumer) y empezar a extraer datos del tópico:

In [None]:
gestor = SistemaGestor.get_instance("temperaturas")

In [None]:
gestor.set_umbral(30)
gestor.receive_data_and_process()

{'media': 34.4033, 'mediana': 34.4033, 'Desviacion Tipica': 0, 'max': 34.4033, 'min': 34.4033, 'Q1': 34.4033, 'Q3': 34.4033}
{'media': 32.9768, 'mediana': 32.9768, 'Desviacion Tipica': 2.0174, 'max': 34.4033, 'min': 31.5502, 'Q1': 34.4033, 'Q3': 34.4033}
{'media': 27.7713, 'mediana': 31.5502, 'Desviacion Tipica': 9.1283, 'max': 34.4033, 'min': 17.3604, 'Q1': 17.3604, 'Q3': 34.4033}
{'media': 30.2895, 'mediana': 32.9768, 'Desviacion Tipica': 8.9953, 'max': 37.8441, 'min': 17.3604, 'Q1': 17.3604, 'Q3': 34.4033}
{'media': 26.3514, 'mediana': 31.5502, 'Desviacion Tipica': 11.7571, 'max': 37.8441, 'min': 10.599, 'Q1': 10.599, 'Q3': 34.4033}
{'media': 27.0139, 'mediana': 30.9382, 'Desviacion Tipica': 10.6403, 'max': 37.8441, 'min': 10.599, 'Q1': 10.599, 'Q3': 34.4033}
{'media': 26.2191, 'mediana': 30.3262, 'Desviacion Tipica': 9.9383, 'max': 37.8441, 'min': 10.599, 'Q1': 17.3604, 'Q3': 34.4033}
{'media': 24.3286, 'mediana': 25.8882, 'Desviacion Tipica': 10.6419, 'max': 37.8441, 'min': 10.599

KeyboardInterrupt: 