In [1]:
from platform import python_version
print("Versão usada neste notebook:", python_version())

Versão usada neste notebook: 3.9.12


In [2]:
#Importando o findspark
import findspark
findspark.init()

In [3]:
#Importações necessárias

import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json

O Spark Streaming possui conexão com diversos frameworks e SGBDs, para o nosso projeto precisamos de um conector para o Apache Kafka.

In [4]:
# Usando o conector:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 pyspark-shell'

## Criando uma Sessão Spark

In [5]:
spark_session = SparkSession.builder.appName('Spark_Streaming').getOrCreate()

23/04/19 08:38:48 WARN Utils: Your hostname, mor-Inspiron-3501 resolves to a loopback address: 127.0.1.1; using 192.168.15.175 instead (on interface wlp0s20f3)
23/04/19 08:38:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/mor/.ivy2/cache
The jars for the packages stored in: /home/mor/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b674702c-d2d5-48dd-a1bb-7f1f6b8f0011;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql

23/04/19 08:39:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/19 08:39:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/04/19 08:39:30 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/04/19 08:39:30 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/04/19 08:39:30 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/04/19 08:39:30 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
23/04/19 08:39:30 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
23/04/19 08:39:30 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.


## Leitura do Kafka Spark Structured Stream

O que fazemos aqui é criar uma subscrição no tópico que tem o streaming de dados dos quais desejamos "puxar" esses dados
Uma subscrição no tópico serve para receber tudo o que o tópico receber.

In [8]:
# Aqui estabelecemos a conexão entre o Spark Streaming e o Apache Kafka

df = spark_session \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "dsamp6") \
  .load()

## Definição do Schema da Fonte de Dados

In [9]:
# Definimos o schema dos dados que desejamos capturar para análise (neste caso a temperatura de cada sensor)

schema_dados_temp = StructType([StructField('leitura',
                                           StructType([StructField('temperatura', DoubleType(), True)]),True)])

In [11]:
# Agora definimos um schema global dos dados
schema_dados_global = StructType([
    StructField('id_sensor', StringType(), True),
    StructField('id_equipamento', StringType(), True),
    StructField('sensor', StringType(), True),
    StructField('data_evento', StringType(), True),
    StructField('padrao', schema_dados_temp, True),
    
])

## Parse da Fonte de Dados

Neste ponto precisamos informar o Spark como ele deverá formatar os dados.

In [12]:
# Capturamos cada linha de dado (cada valor) como string
# Vale lembrar que o df é um conexão com o kafka (readStream)

df_conversao = df.selectExpr('CAST(value AS STRING)') # Cada linha do arquivo de dados é um value e estamos convertendo 
                                                    # cada linha para string.

In [13]:
# Parse do formato JSON em dataframe

#Pegamos os dados e o nomeamos de jsonData, convertemos então de json para o nosso schema de dados definido acima.
#Depois selecionamos todos os dados.

df_conversao = df_conversao.withColumn('jsonData', from_json(col('value'), schema_dados_global)).select('jsonData.*')

In [14]:
df_conversao.printSchema()

root
 |-- id_sensor: string (nullable = true)
 |-- id_equipamento: string (nullable = true)
 |-- sensor: string (nullable = true)
 |-- data_evento: string (nullable = true)
 |-- padrao: struct (nullable = true)
 |    |-- leitura: struct (nullable = true)
 |    |    |-- temperatura: double (nullable = true)



## Preparando do Dataframe

In [16]:
# Não nos interessa todas as colunas mostradas no schema acima.
# O precisamos será a coluna de sensor e de temperatura, para calcularmos a média por sensor.

df_conversao_temp_sensor = df_conversao.select(col('padrao.leitura.temperatura').alias('temperatura'),
                                              col('sensor'))

In [17]:
df_conversao_temp_sensor.printSchema()

root
 |-- temperatura: double (nullable = true)
 |-- sensor: string (nullable = true)



## Análise de Dados em Tempo Real

In [18]:
# Aqui temos o objeto que irá conter nossa análise, o cálculo da média das temperaturas por sensor

df_media_temp_sensor = df_conversao_temp_sensor.groupBy('sensor').mean('temperatura')

In [19]:
df_media_temp_sensor.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- avg(temperatura): double (nullable = true)



In [20]:
# Renomeamos as colunas para simplificar nossa análise

df_media_temp_sensor = df_media_temp_sensor.select(col('sensor').alias('sensor'),
                                                  col('avg(temperatura)').alias('media_temp'))


In [21]:
df_media_temp_sensor.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- media_temp: double (nullable = true)



In [22]:
# Abrimos o streaming para análise de dados em tempo real, imprimindo o resultado no console.

# Objeto que inicia a consulta ao streaming com formato de console

query = df_media_temp_sensor.writeStream.outputMode('complete').format('console').start()

23/04/20 08:54:34 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b034a5da-ca5b-4493-8b20-255e316b7b4d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/20 08:54:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------+
|sensor|media_temp|
+------+----------+
+------+----------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7|              79.0|
|sensor34|              80.6|
|sensor41|62.849999999999994|
|sensor50|60.366666666666674|
|sensor31| 36.94285714285714|
|sensor38|             57.15|
| sensor1|40.925000000000004|
|sensor30|              72.2|
|sensor10|             63.98|
|sensor25|45.214285714285715|
| sensor4| 73.51666666666667|
| sensor5| 71.53333333333333|
|sensor20|51.260000000000005|
|sensor44|              43.8|
| sensor8|             52.14|
|sensor14|              48.0|
|sensor24|15.666666666666666|
|sensor43| 55.75714285714285|
|sensor47|              52.9|
|sensor26| 50.82000000000001|
+--------+------------------+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7| 80.65613577023493|
|sensor34|  84.3587012987013|
|sensor41| 64.55243619489562|
|sensor50| 58.22434554973823|
|sensor31| 37.58141809290952|
|sensor38| 57.75738498789346|
| sensor1| 38.24480198019803|
|sensor30| 71.58542199488494|
|sensor10| 62.67192513368979|
|sensor25| 42.45537190082647|
| sensor4|  72.8814621409922|
| sensor5| 72.02843137254901|
|sensor20|49.387439613526524|
|sensor44| 40.02533936651584|
|sensor19| 58.73832487309649|
| sensor8| 52.12580645161293|
|sensor14| 48.72765432098761|
|sensor24|16.738000000000007|
|sensor43|54.291542288557224|
|sensor47| 53.70594059405943|
+--------+------------------+
only showing top 20 rows



In [23]:
# Executamos a query do streaming e evitamos que o processo seja encerrado
query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/mor/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [24]:
query.status

{'message': 'Getting offsets from KafkaV2[Subscribe[dsamp6]]',
 'isDataAvailable': False,
 'isTriggerActive': True}

In [25]:
query.lastProgress

{'id': '79b41f04-096a-4334-a343-02882c1a9c4c',
 'runId': '31d4d68b-7ca3-4f67-8de8-29d57658d9a7',
 'name': None,
 'timestamp': '2023-04-20T12:05:32.897Z',
 'batchId': 3,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 1, 'triggerExecution': 1},
 'stateOperators': [{'operatorName': 'stateStoreSave',
   'numRowsTotal': 50,
   'numRowsUpdated': 0,
   'allUpdatesTimeMs': 1100,
   'numRowsRemoved': 0,
   'allRemovalsTimeMs': 0,
   'commitTimeMs': 13021,
   'memoryUsedBytes': 101600,
   'numRowsDroppedByWatermark': 0,
   'numShufflePartitions': 200,
   'numStateStoreInstances': 200,
   'customMetrics': {'loadedMapCacheHitCount': 800,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 30568}}],
 'sources': [{'description': 'KafkaV2[Subscribe[dsamp6]]',
   'startOffset': {'dsamp6': {'0': 40000}},
   'endOffset': {'dsamp6': {'0': 40000}},
   'latestOffset': {'dsamp6': {'0': 40000}},
   'numInputRows': 0,
   'inp

In [27]:
query.explain()

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@28a9bcee, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2405/0x00000001011a0040@68fabe64
+- *(4) HashAggregate(keys=[sensor#37], functions=[avg(temperatura#58)])
   +- StateStoreSave [sensor#37], state info [ checkpoint = file:/tmp/temporary-b034a5da-ca5b-4493-8b20-255e316b7b4d/state, runId = 31d4d68b-7ca3-4f67-8de8-29d57658d9a7, opId = 0, ver = 2, numPartitions = 200], Complete, 0, 2
      +- *(3) HashAggregate(keys=[sensor#37], functions=[merge_avg(temperatura#58)])
         +- StateStoreRestore [sensor#37], state info [ checkpoint = file:/tmp/temporary-b034a5da-ca5b-4493-8b20-255e316b7b4d/state, runId = 31d4d68b-7ca3-4f67-8de8-29d57658d9a7, opId = 0, ver = 2, numPartitions = 200], 2
            +- *(2) HashAggregate(keys=[sensor#37], functions=[merge_avg(temperatura#58)])
               +- Exchange hashpartitioning(sensor#37, 200), ENSURE_REQUIREMENTS

## Análise de Dados em Tempo Real

In [28]:
# Objeto que inicia a consulta ao streaming com formato de memória (cria tabela temporária)
# Esta tabela temporária é criada na memória

query_memoria = df_media_temp_sensor \
    .writeStream \
    .queryName("memo") \
    .outputMode("complete") \
    .format("memory") \
    .start()

23/04/20 09:10:31 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f5a21bb8-da14-42dd-a05d-476883b936de. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/20 09:10:31 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

In [30]:
# Streams ativados
spark_session.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x7fc5714c7880>,
 <pyspark.sql.streaming.StreamingQuery at 0x7fc571500c10>]

In [32]:
# Vamos manter a query executando por algum tempo e aplicando SQL aos dados em tempo real

from time import sleep

for i in range(30):
    
    spark_session.sql("select sensor, round(media_temp, 2) as media from memo where media_temp > 65")
    sleep(3)
    
query_memoria.stop()


                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7|  80.7078205128205|
|sensor34|  84.3751282051282|
|sensor41| 64.42126642771805|
|sensor50| 58.38560606060607|
|sensor31| 37.62222222222221|
|sensor38| 57.88756345177665|
| sensor1| 38.34196319018406|
|sensor30| 71.62153236459712|
|sensor10|62.702002670226925|
|sensor25| 42.64750957854407|
| sensor4| 73.00585034013606|
| sensor5| 72.01032806804375|
|sensor20| 49.43928571428568|
|sensor44|39.950301568154416|
|sensor19| 58.72225000000004|
| sensor8| 52.10872817955112|
|sensor14| 48.98612440191384|
|sensor24| 16.68031980319803|
|sensor43| 54.42487922705317|
|sensor47|  53.5885101010101|
+--------+------------------+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7| 80.68432708688245|
|sensor34| 84.38411614005123|
|sensor41|  64.4394611727417|
|sensor50| 58.37806943268418|
|sensor31| 37.62614107883816|
|sensor38| 57.92165820642979|
| sensor1|38.312758620689664|
|sensor30| 71.69095563139933|
|sensor10| 62.71521926053305|
|sensor25| 42.72114236999148|
| sensor4| 73.05489177489177|
| sensor5| 71.97064595257564|
|sensor20|49.505354200988435|
|sensor44|39.848497156783104|
|sensor19|58.842004971002524|
| sensor8| 52.13907172995782|
|sensor14| 48.97538213998388|
|sensor24| 16.67905004240882|
|sensor43|  54.3588972431078|
|sensor47| 53.50851979345956|
+--------+------------------+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7| 80.68095238095238|
|sensor34| 84.38549488054606|
|sensor41| 64.44264240506331|
|sensor50|58.381802864363955|
|sensor31|37.630737365368674|
|sensor38| 57.92516891891893|
| sensor1|38.314703353396396|
|sensor30| 71.69558198810536|
|sensor10| 62.71400343642606|
|sensor25| 42.72189097103919|
| sensor4|  73.0492666091458|
| sensor5| 71.97765089722675|
|sensor20| 49.51549180327866|
|sensor44|39.850770478507705|
|sensor19| 58.83887510339127|
| sensor8| 52.14352941176472|
|sensor14| 48.97443729903534|
|sensor24|16.676949152542374|
|sensor43|  54.3586666666667|
|sensor47| 53.50935622317597|
+--------+------------------+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+--------+------------------+
|  sensor|        media_temp|
+--------+------------------+
| sensor7|  80.7106329113924|
|sensor34| 84.43331202046035|
|sensor41| 64.50567765567767|
|sensor50|58.435455124124765|
|sensor31|37.703567035670346|
|sensor38|57.920769230769224|
| sensor1| 38.31147335423198|
|sensor30| 71.67444089456869|
|sensor10| 62.71627612412915|
|sensor25|42.744812499999995|
| sensor4| 73.06052295918367|
| sensor5|  71.9385089340727|
|sensor20| 49.52092739475285|
|sensor44| 39.86604361370716|
|sensor19| 58.85559655596558|
| sensor8| 52.07428393524284|
|sensor14| 49.02474226804122|
|sensor24|16.711719253058597|
|sensor43|54.432601880877776|
|sensor47|53.452858958068624|
+--------+------------------+
only showing top 20 rows



## Fim