## Prevendo a temperatura de sensores de uma maquina industrial em tempo real

Projeto que tem como objetivo demonstrar a utilização do Spark Streaming e do Apache Kafka em conjunto.
Este projeto já considera que o o Apache Kafka está rodando no meu terminal.

In [2]:
from platform import python_version
print(python_version())

3.9.13


In [3]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [5]:
# Import required modules
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json

Precisamos incluir o conector de integração do Spark Streaming com o Apache Kafka.

In [16]:
# Conector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

## Criando a Sessão Spark

In [17]:
# Cria a sessão Spark
spark = SparkSession.builder.appName('Dados em Tempo Real com Spark e Kafka').getOrCreate()

23/03/23 16:40:43 WARN Utils: Your hostname, ingo-Vostro-3583 resolves to a loopback address: 127.0.1.1; using 192.168.1.10 instead (on interface wlo1)
23/03/23 16:40:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/ingo/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ingo/.ivy2/cache
The jars for the packages stored in: /home/ingo/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f4dbad97-a872-4909-babc-2432c85c8868;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
downloading

23/03/23 16:40:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Leitura do Kafka Spark Structed Stream

In [20]:
# Vamos criar uma subscrição no tópico que tem o streaming de dados que desejamos "puxar" os dados.
df = spark \
    .readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:9092') \
    .option('subscribe', 'meu_projeto') \
    .load()

## Definição do Schema da Fonde de Dados

In [23]:
# Definimos o schema dos dados que desejamos capturar para análise (temperatura)
esquema_dados_temp = StructType([StructField('leitura',
                                              StructType([StructField('temperatura', DoubleType(), True)]), True)])

In [24]:
# Definimos o schema global dos dados no streaming
esquema_dados = StructType([
    StructField('id_sensor', StringType(), True),
    StructField('id_equipamento', StringType(), True),
    StructField('sensor', StringType(), True),
    StructField('data_evento', StringType(), True),
    StructField('padrao', esquema_dados_temp, True)
])

## Parse da Fonte de Dados

In [25]:
# Capturamos cada linha de dado (cada valor) como string
df_conversao = df.selectExpr('CAST(value AS STRING)')

In [26]:
# Parse do formato JSON em dataframe
df_conversao = df_conversao.withColumn('jsonData', from_json(col('value'), esquema_dados)).select('jsonData.*')

In [27]:
df_conversao.printSchema()

root
 |-- id_sensor: string (nullable = true)
 |-- id_equipamento: string (nullable = true)
 |-- sensor: string (nullable = true)
 |-- data_evento: string (nullable = true)
 |-- padrao: struct (nullable = true)
 |    |-- leitura: struct (nullable = true)
 |    |    |-- temperatura: double (nullable = true)



## Preparamos o Dataframe
Esse dataframe está no formato que precisamos para análise.

In [28]:
# Renomeamos as colunas para simplificar nossa análise
df_conversao_temp_sensor = df_conversao.select(col('padrao.leitura.temperatura').alias('temperatura'),
                                               col('sensor'))

In [30]:
df_conversao_temp_sensor.printSchema()

root
 |-- temperatura: double (nullable = true)
 |-- sensor: string (nullable = true)



In [31]:
# Não podemos visualizar o dataframe, pois a fonte é de streaming
# df_conversao_temp_sensor.head()

## Análise de Dados em Tempo Real

In [33]:
# Aqui temos o objeto que irá conter nossa análise, o cálculo da média das temperaturas por sensor
df_media_temp_sensor = df_conversao_temp_sensor.groupBy('sensor').mean('temperatura')

In [35]:
df_media_temp_sensor.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- avg(temperatura): double (nullable = true)



In [36]:
# Renomeamos as colunas para simplificar nossa análise
df_media_temp_sensor = df_media_temp_sensor.select(col('sensor').alias('sensor'),
                                                   col('avg(temperatura)').alias('media_temp'))

In [37]:
df_media_temp_sensor.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- media_temp: double (nullable = true)



Abaixo abrimos o straming para análise de dados em tempo real, imprimindo o resultado do console.

In [39]:
# Objeto que inicia a consulta ao straming com formado de console
query = df_media_temp_sensor.writeStream.outputMode('complete').format('console').start()

23/03/23 17:25:29 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2ebb6837-c5d6-47de-9550-7b4fb7e718b3. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/23 17:25:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------+
|sensor|media_temp|
+------+----------+
+------+----------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7| 80.17142857142856|
|sensor34| 85.97999999999999|
|sensor41| 64.72500000000001|
|sensor50|             59.15|
|sensor38|59.800000000000004|
|sensor31|             37.65|
| sensor1| 39.27142857142858|
|sensor10| 60.87500000000001|
|sensor30| 68.83333333333333|
|sensor25|42.385714285714286|
| sensor4| 73.50000000000001|
| sensor5|              70.4|
|sensor20|48.800000000000004|
|sensor44|              38.1|
|sensor19|57.833333333333336|
| sensor8| 51.03333333333333|
|sensor14| 51.34285714285714|
|sensor24|             14.15|
|sensor43|             56.05|
|sensor47|              56.0|
+--------+------------------+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7| 80.38762886597935|
|sensor34| 84.63955555555556|
|sensor41| 64.29948979591838|
|sensor50| 58.32546296296296|
|sensor31|37.733689839572186|
|sensor38|57.922815533980604|
| sensor1| 38.27512690355329|
|sensor10|  62.8454081632653|
|sensor30|  71.8108695652174|
|sensor25|43.041752577319606|
| sensor4| 73.27192118226601|
| sensor5| 71.77276995305165|
|sensor20| 49.94117647058822|
|sensor44| 40.09814814814817|
|sensor19|              58.5|
| sensor8| 51.97599999999999|
|sensor14|49.149404761904755|
|sensor24| 16.58366336633663|
|sensor43| 54.79739583333335|
|sensor47| 53.75657894736842|
+--------+------------------+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7| 80.43010204081631|
|sensor34| 84.61629955947137|
|sensor41| 64.26915422885573|
|sensor50|            58.285|
|sensor31| 37.74338624338624|
|sensor38| 57.87924528301888|
| sensor1| 38.32388059701492|
|sensor10| 62.85721393034825|
|sensor30| 71.81058201058201|
|sensor25| 43.01776649746195|
| sensor4| 73.33047619047619|
| sensor5| 71.76930232558139|
|sensor20| 49.91650943396225|
|sensor44| 40.10230414746546|
|sensor19|              58.5|
| sensor8|51.980676328502405|
|sensor14| 49.13274853801169|
|sensor24|16.556796116504852|
|sensor43| 54.81709844559587|
|sensor47| 53.77359307359308|
+--------+------------------+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7| 80.66605263157895|
|sensor34|  84.4991208791209|
|sensor41| 64.40315533980585|
|sensor50| 58.17645569620253|
|sensor31|  37.8650872817955|
|sensor38| 57.86391752577321|
| sensor1| 38.28641025641026|
|sensor10|62.786330935251804|
|sensor30|  71.7271428571429|
|sensor25| 42.92268041237115|
| sensor4| 73.27254901960787|
| sensor5|  71.7095238095238|
|sensor20|49.652655889145485|
|sensor44|40.166259168704165|
|sensor19|  58.7860349127182|
| sensor8| 51.87468030690536|
|sensor14| 48.98467336683417|
|sensor24|16.644575471698108|
|sensor43| 54.61506493506494|
|sensor47| 53.56560975609756|
+--------+------------------+
only showing top 20 rows



Envie novos arquivos para o Kafka a fim de ver a análise tempo real por aqui. Clique no botão Stop no menu superior para interromper a célula a qualquer momento.

In [40]:
# Executamos a query do streaming e evitamos que o processo seja encerrado
query.awaitTermination()

                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7| 80.67994791666666|
|sensor34| 84.48665207877464|
|sensor41| 64.40315533980585|
|sensor50| 58.15839598997494|
|sensor31| 37.88325123152709|
|sensor38| 57.86403061224491|
| sensor1|38.291857506361325|
|sensor10| 62.77995226730311|
|sensor30| 71.70649717514127|
|sensor25| 42.91074168797955|
| sensor4| 73.27170731707321|
| sensor5| 71.71592039800994|
|sensor20| 49.64919540229884|
|sensor44|40.153640776699035|
|sensor19|58.774129353233825|
| sensor8|51.866071428571416|
|sensor14|          49.00125|
|sensor24| 16.63294117647058|
|sensor43| 54.60719794344474|
|sensor47| 53.56496350364963|
+--------+------------------+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7|  80.6403418803419|
|sensor34| 84.53696969696969|
|sensor41| 64.49641109298534|
|sensor50| 58.24337349397591|
|sensor31| 37.86589018302829|
|sensor38|57.847254575707176|
| sensor1| 38.32307692307692|
|sensor10|62.739770867430444|
|sensor30| 71.64095063985377|
|sensor25| 42.90232945091516|
| sensor4|  73.4057096247961|
| sensor5| 71.63752012882446|
|sensor20| 49.51806853582554|
|sensor44| 40.12380952380952|
|sensor19| 58.82838063439065|
| sensor8| 51.97439862542956|
|sensor14| 48.97466666666667|
|sensor24|16.671617161716167|
|sensor43| 54.53056994818654|
|sensor47| 53.55258764607679|
+--------+------------------+
only showing top 20 rows



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/ingo/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/ingo/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/ingo/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [41]:
query.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [42]:
query.lastProgress

{'id': 'f61f3fa5-d11e-4176-9432-224e25134e6b',
 'runId': '40e2a1de-3d5a-4b81-8a42-d951601d569e',
 'name': None,
 'timestamp': '2023-03-23T20:41:58.689Z',
 'batchId': 7,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 1, 'triggerExecution': 2},
 'stateOperators': [{'operatorName': 'stateStoreSave',
   'numRowsTotal': 50,
   'numRowsUpdated': 0,
   'allUpdatesTimeMs': 2623,
   'numRowsRemoved': 0,
   'allRemovalsTimeMs': 0,
   'commitTimeMs': 26508,
   'memoryUsedBytes': 101936,
   'numRowsDroppedByWatermark': 0,
   'numShufflePartitions': 200,
   'numStateStoreInstances': 200,
   'customMetrics': {'loadedMapCacheHitCount': 2400,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 30568}}],
 'sources': [{'description': 'KafkaV2[Subscribe[meu_projeto]]',
   'startOffset': {'meu_projeto': {'0': 40000}},
   'endOffset': {'meu_projeto': {'0': 40000}},
   'latestOffset': {'meu_projeto': {'0': 40000}},
   'numI

In [43]:
query.explain()

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1379fee4, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2383/0x000000010116d440@2cbbdf09
+- *(4) HashAggregate(keys=[sensor#49], functions=[avg(temperatura#57)])
   +- StateStoreSave [sensor#49], state info [ checkpoint = file:/tmp/temporary-2ebb6837-c5d6-47de-9550-7b4fb7e718b3/state, runId = 40e2a1de-3d5a-4b81-8a42-d951601d569e, opId = 0, ver = 6, numPartitions = 200], Complete, 0, 2
      +- *(3) HashAggregate(keys=[sensor#49], functions=[merge_avg(temperatura#57)])
         +- StateStoreRestore [sensor#49], state info [ checkpoint = file:/tmp/temporary-2ebb6837-c5d6-47de-9550-7b4fb7e718b3/state, runId = 40e2a1de-3d5a-4b81-8a42-d951601d569e, opId = 0, ver = 6, numPartitions = 200], 2
            +- *(2) HashAggregate(keys=[sensor#49], functions=[merge_avg(temperatura#57)])
               +- Exchange hashpartitioning(sensor#49, 200), ENSURE_REQUIREMENTS

## Análise de Dados em Tempo Real

In [44]:
# Objeto que inicia a consulta ao streaming com formato de memória (cria tabela temporária)
query_memoria = df_media_temp_sensor \
    .writeStream \
    .queryName('ingo') \
    .outputMode('complete') \
    .format('memory') \
    .start()

23/03/23 17:45:42 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d8dd10e3-ff80-4d09-ad9c-4c688e9101bd. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/23 17:45:42 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

In [46]:
# Streams ativados
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x7f20104139d0>,
 <pyspark.sql.streaming.StreamingQuery at 0x7f20103183d0>]

In [48]:
# Vamos manter a query executando por algum tempo e aplicando SQL aos dados em tempo real
from time import sleep

for x in range(10):
    
    spark.sql('select sensor, round(media_temp, 2) as media from ingo where media_temp > 65').show()
    sleep(3)
    
query_memoria.stop()



+------+-----+
|sensor|media|
+------+-----+
+------+-----+



                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7| 80.64810863239575|
|sensor34|  84.6014869888476|
|sensor41| 64.59472636815923|
|sensor50|58.377334732423925|
|sensor31|37.855588526211676|
|sensor38|57.800000000000026|
| sensor1|38.283682008368196|
|sensor10| 62.78532663316584|
|sensor30|  71.6750738916256|
|sensor25| 42.73330049261086|
| sensor4| 73.39194312796208|
| sensor5| 71.67099080694584|
|sensor20|49.467289719626166|
|sensor44| 40.15437185929648|
|sensor19|  58.6866204162537|
| sensor8|51.938270377733595|
|sensor14| 48.92823061630219|
|sensor24| 16.63876739562624|
|sensor43|54.412439261418854|
|sensor47|53.309783728115335|
+--------+------------------+
only showing top 20 rows

+------+-----+
|sensor|media|
+------+-----+
+------+-----+





+------+-----+
|sensor|media|
+------+-----+
+------+-----+



                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7| 80.66400322841002|
|sensor34| 84.56037588097104|
|sensor41| 64.55382674516402|
|sensor50| 58.40918635170603|
|sensor31| 37.83206296603149|
|sensor38| 57.80048622366291|
| sensor1| 38.33978132884777|
|sensor10| 62.76018518518519|
|sensor30| 71.66691419141914|
|sensor25| 42.76210526315791|
| sensor4| 73.33364854215918|
| sensor5| 71.70967741935483|
|sensor20|49.417809298660366|
|sensor44|40.148471986417654|
|sensor19|58.634428923582576|
| sensor8| 52.00951178451178|
|sensor14|48.893708053691284|
|sensor24|16.649295774647882|
|sensor43| 54.44992012779553|
|sensor47| 53.38359966358283|
+--------+------------------+
only showing top 20 rows

+------+-----+
|sensor|media|
+------+-----+
+------+-----+

+------+-----+
|sensor|media|
+------+-----+
+------+-----+

+------+-----+
|sensor|media|
+---