In [1]:
%run "./setup/setup_data"

In [2]:
output_path = "/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/"
checkpoint_path = "/tmp/pydata/Streaming/continuous_streaming/out/iot-stream-checkpoint"
#
#cria o checkpoint para armazenar os estados intermediários
#
dbutils.fs.rm(checkpoint_path,True) #remove os chackpoints anteriores
dbutils.fs.mkdirs(checkpoint_path) #atualiza os diretórios
#
#
bad_records_path = "/tmp/pydata/Streaming/continuous_streaming/badRecordsPath/streaming-sensor/"
dbutils.fs.rm(bad_records_path, True) #remove o diretório anterior
dbutils.fs.mkdirs(bad_records_path) #atualiza o diretório

In [3]:
sensor_path = "/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/"
sensor_file_name= sensor_path + "streaming-sensor_file-1.json"
dbutils.fs.head(sensor_file_name, 233)

In [4]:
#importando as bibliotecas 
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [5]:
#criando os esquemas - são definidos 2 esquemas para os dados

#esquema original para os dados gerados em formato JSON
jsonSchema = (
  StructType()
  .add("timestamp", TimestampType()) #event time 
  .add("deviceId", LongType())
  .add("deviceType", StringType())
  .add("signalStrength", DoubleType())
)

In [6]:
#esquema modificado para que os dados possam ser adicionados ao formato de saída
parquetSchema = (
  StructType()
  .add("timestamp", TimestampType()) #event time 
  .add("deviceId", LongType())
  .add("deviceType", StringType())
  .add("signalStrength", DoubleType())
  .add("INPUT_FILE_NAME", StringType()) #Nome do arquivo em que o dado do sensor foi criado 
  .add("PROCESSED_TIME", TimestampType())) #timestamp para o processamento dos dados

In [7]:
#Inicia a construção do "pipeline" (define a fonte)
inputDF = ( spark 
          .readStream 
          .schema(jsonSchema) #esquema definido para a coleta dos dados presentes em JSON
          .option("maxFilesPerTrigger", 1)  #mantém a leitura de apenas um arquivo por batch, para manter mais lenta a coleta
          .option("badRecordsPath", bad_records_path) #define o modo de leitura para os dados "ruins"
          .json(sensor_path) #define o local a ser pesquisado para obter os dados
          .withColumn("INPUT_FILE_NAME", input_file_name()) #cria a coluna para armazenar o nome do arquivo o qual o dado foi lido
          .withColumn("PROCESSED_TIME", current_timestamp()) #adiciona o tempo em que o dado foi processado
          .withWatermark("PROCESSED_TIME", "1 minute") #adiciona a janela de tempo para a leitura (marca d'água)
         )

In [9]:
inputDF.isStreaming

In [10]:
# define a saída (sink)
query = (inputDF
         .writeStream
         .format("parquet") #define o formato do arquivo a ser utilizado (parquet)
         .option("path", output_path) #define o local onde os arquivos devem ser adicionados
         .option("checkpointLocation", checkpoint_path) # define o checkpoint para garantir a tolerãncia a falhas
         .outputMode("append") # define o modo de saída para os dados
         .queryName("devices") #define o nome para a consulta
         .trigger(processingTime='5 seconds') #define o "tempo de processamento" para cada dado recebido (intervalos de 5s novas linhas)
         .start() #inicia o processamento em streaming
        )

In [11]:
%fs ls /tmp/pydata/Streaming/continuous_streaming/out/iot-stream/

path,name,size
dbfs:/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/_spark_metadata/,_spark_metadata/,0
dbfs:/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/part-00000-027bf884-7f80-40bf-aab4-108d89c2f7d5-c000.snappy.parquet,part-00000-027bf884-7f80-40bf-aab4-108d89c2f7d5-c000.snappy.parquet,4661
dbfs:/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/part-00000-02d82f55-0387-41a8-984c-6840d928b12e-c000.snappy.parquet,part-00000-02d82f55-0387-41a8-984c-6840d928b12e-c000.snappy.parquet,4661
dbfs:/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/part-00000-03c1a0d8-cd39-49f2-a2cb-12ee25a7af17-c000.snappy.parquet,part-00000-03c1a0d8-cd39-49f2-a2cb-12ee25a7af17-c000.snappy.parquet,4655
dbfs:/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/part-00000-040aef45-e295-4c28-872c-42c00661aa27-c000.snappy.parquet,part-00000-040aef45-e295-4c28-872c-42c00661aa27-c000.snappy.parquet,4733
dbfs:/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/part-00000-04342aba-8932-4e76-a538-66f3cd5da0ae-c000.snappy.parquet,part-00000-04342aba-8932-4e76-a538-66f3cd5da0ae-c000.snappy.parquet,4683
dbfs:/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/part-00000-0a26bb04-8b04-4e9c-a0ab-aa1d7c17c6e3-c000.snappy.parquet,part-00000-0a26bb04-8b04-4e9c-a0ab-aa1d7c17c6e3-c000.snappy.parquet,4645
dbfs:/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/part-00000-0b897019-68d2-445c-ba32-3024ba3dd083-c000.snappy.parquet,part-00000-0b897019-68d2-445c-ba32-3024ba3dd083-c000.snappy.parquet,4665
dbfs:/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/part-00000-0c06c5db-abbb-4af3-94ae-0dd9232b8e72-c000.snappy.parquet,part-00000-0c06c5db-abbb-4af3-94ae-0dd9232b8e72-c000.snappy.parquet,4656
dbfs:/tmp/pydata/Streaming/continuous_streaming/out/iot-stream/part-00000-0ef6dddd-aba5-4b95-bf4d-5e31a1ef683c-c000.snappy.parquet,part-00000-0ef6dddd-aba5-4b95-bf4d-5e31a1ef683c-c000.snappy.parquet,4649


In [12]:
inputDF.createOrReplaceTempView("parquet_sensors")

In [14]:
%sql select * from parquet_sensors where deviceType = 'SensorTypeD' or deviceType = 'SensorTypeA'

timestamp,deviceId,deviceType,signalStrength,INPUT_FILE_NAME,PROCESSED_TIME
2016-08-03T01:32:20.454+0000,44,SensorTypeD,0.6627327869584749,dbfs:/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/streaming-sensor_file-1.json,2019-11-07T23:28:43.004+0000
2016-08-03T01:32:11.230+0000,36,SensorTypeD,0.8869519587040092,dbfs:/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/streaming-sensor_file-1.json,2019-11-07T23:28:43.004+0000
2016-08-03T01:32:29.490+0000,6,SensorTypeA,0.621775542316748,dbfs:/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/streaming-sensor_file-1.json,2019-11-07T23:28:43.004+0000
2016-08-03T01:32:26.601+0000,31,SensorTypeA,0.7217142790760191,dbfs:/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/streaming-sensor_file-1.json,2019-11-07T23:28:43.004+0000
2016-08-03T01:32:14.999+0000,2,SensorTypeD,0.9463019453554667,dbfs:/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/streaming-sensor_file-1.json,2019-11-07T23:28:43.004+0000
2016-08-03T01:32:19.582+0000,31,SensorTypeA,0.4177194090879053,dbfs:/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/streaming-sensor_file-1.json,2019-11-07T23:28:43.004+0000
2016-08-03T01:32:14.446+0000,61,SensorTypeA,0.8093833461353955,dbfs:/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/streaming-sensor_file-1.json,2019-11-07T23:28:43.004+0000
2016-08-03T01:32:12.688+0000,93,SensorTypeA,0.6009808303441028,dbfs:/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/streaming-sensor_file-1.json,2019-11-07T23:28:43.004+0000
2016-08-03T01:32:13.518+0000,45,SensorTypeD,0.705299532046275,dbfs:/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/streaming-sensor_file-1.json,2019-11-07T23:28:43.004+0000
2016-08-03T01:32:15.719+0000,68,SensorTypeD,0.3990282791790734,dbfs:/mnt/jules-pydata/Streaming/continuous_streaming/streaming_sensor/streaming-sensor_file-1.json,2019-11-07T23:28:43.004+0000


In [15]:
spark.conf.set("spark.sql.shuffle.partitions", "1") #define valor para os shuffles

#define a configuração para a leitura
devices = (spark.readStream
           .schema(parquetSchema) #lê os dados através do esquema definido para a transformação
           .format("parquet") #define o formato dos aquivos de leitura
           .option("maxFilesPerTrigger", 1) #Mantém a leitura dos dados como 1 arquivo para a demonstração ser mais lenta
           .load(output_path) # indica de onde o arquivo será lido 
           .withWatermark("PROCESSED_TIME", "1 minute") #define a janela de tempo para a leitura dos dados
          )

 # define a tabela temporária para que seja possível realizar as consultas sobre os dados utilizando o SQL 
devices.createOrReplaceTempView("sensors")

In [17]:
%sql select count(*) from sensors

count(1)
10000


In [19]:
%sql 

select count(*), deviceType, min(signalStrength), max(signalStrength), avg(signalStrength) 
  from sensors 
    group by deviceType 
    order by deviceType asc

count(1),deviceType,min(signalStrength),max(signalStrength),avg(signalStrength)
2505,SensorTypeA,0.00035972269192152684,0.9995903077803152,0.5039735263517676
2465,SensorTypeB,0.0003522297455231804,0.9993696998766688,0.4977416083853139
2556,SensorTypeC,0.000665677326306735,0.999441839444791,0.5064546450953427
2474,SensorTypeD,0.0002412764460101302,0.9997615682074666,0.5048238617609497


**Conta a quantidade de leituras em cada uma das janelas de tempo (intervalo de 5segundos).**


Pore xemplo:

[(00:00 - 00:05), (00:05: 00:10), (00:10: 00:15)]

O evento pode ser adicionado em qualquer uma das janelas.

In [21]:
(devices
 .groupBy(  #função utilizada para agrupamento dos dados
   window("timestamp", "5 seconds"), #define a janela de tempo para cada 5 segundos
   "deviceId"
 )
 .count()  #conta a quantidade de registros
 .createOrReplaceTempView("sensor_counts")) #cria a tabela temporária para os dados

In [22]:
%sql select * from sensor_counts where count < 5 order by window.start desc

window,deviceId,count
"List(2016-08-03T03:11:30.000+0000, 2016-08-03T03:11:35.000+0000)",23,1
"List(2016-08-03T03:11:30.000+0000, 2016-08-03T03:11:35.000+0000)",2,1
"List(2016-08-03T03:11:30.000+0000, 2016-08-03T03:11:35.000+0000)",82,1
"List(2016-08-03T03:11:30.000+0000, 2016-08-03T03:11:35.000+0000)",78,2
"List(2016-08-03T03:11:30.000+0000, 2016-08-03T03:11:35.000+0000)",98,1
"List(2016-08-03T03:11:25.000+0000, 2016-08-03T03:11:30.000+0000)",6,1
"List(2016-08-03T03:11:25.000+0000, 2016-08-03T03:11:30.000+0000)",86,2
"List(2016-08-03T03:11:25.000+0000, 2016-08-03T03:11:30.000+0000)",12,1
"List(2016-08-03T03:11:25.000+0000, 2016-08-03T03:11:30.000+0000)",89,1
"List(2016-08-03T03:11:25.000+0000, 2016-08-03T03:11:30.000+0000)",31,2


Enviar Alertas Quando Um Comportamento do Sensor Está "Errado"

In [24]:
lost_sensor_signals = (spark.table("sensor_counts") #seleciona a tabela em que estão "armazenados" os dados
         .filter(col("count") < 5) #filtra os sensores que não enviaram sinal dentro do intervalor
         .select("window.start", "window.end", "deviceId", "count") #seleciona as colunas para esse dataframe
         )

#"printa" o Dataframe
display(lost_sensor_signals)

start,end,deviceId,count
2016-08-03T02:13:25.000+0000,2016-08-03T02:13:30.000+0000,81,1
2016-08-03T02:35:25.000+0000,2016-08-03T02:35:30.000+0000,93,1
2016-08-03T02:54:10.000+0000,2016-08-03T02:54:15.000+0000,41,1
2016-08-03T01:32:25.000+0000,2016-08-03T01:32:30.000+0000,52,1
2016-08-03T03:08:25.000+0000,2016-08-03T03:08:30.000+0000,11,1
2016-08-03T02:03:10.000+0000,2016-08-03T02:03:15.000+0000,42,1
2016-08-03T02:46:10.000+0000,2016-08-03T02:46:15.000+0000,87,2
2016-08-03T01:59:20.000+0000,2016-08-03T01:59:25.000+0000,79,1
2016-08-03T02:51:20.000+0000,2016-08-03T02:51:25.000+0000,91,1
2016-08-03T02:25:10.000+0000,2016-08-03T02:25:15.000+0000,69,2


Aplicando Técnicas de ML

In [26]:
(devices
 .select("deviceId","deviceType","signalStrength","PROCESSED_TIME")
 .createOrReplaceTempView("sensor_ML")) #cria a tabela temporária para os dados

In [27]:
from pyspark.ml.regression import LinearRegression

In [28]:
%sql

DESCRIBE sensor_ML

In [29]:
%sql select * from sensor_ML where deviceType = 'SensorTypeD'