## Eliminando arquivos caso existam nas pastas, pois serão destinados as tabelas Delta

In [0]:
%fs rm -r /tmp/delta/events
%fs rm -r /tmp/delta/checkpoint

## Listando os diversos arquivos Json para carga no Delta Lake

In [0]:
%fs ls /databricks-datasets/structured-streaming/events/

path,name,size,modificationTime
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530,1469673865000
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961,1469673866000
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025,1469673878000
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999,1469673879000
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987,1469673880000
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006,1469673881000
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003,1469673882000
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007,1469673883000
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978,1469673885000
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008,1469673886000


## Exibindo o conteúdo de 1 arquivo Json

In [0]:
%python
#Lendo um dos arquivos JSON
dataf3 = spark.read.json("/databricks-datasets/structured-streaming/events/file-1.json")
dataf3.show()

+------+----------+
|action|      time|
+------+----------+
| Close|1469506633|
| Close|1469506636|
|  Open|1469506642|
|  Open|1469506644|
|  Open|1469506646|
|  Open|1469506647|
|  Open|1469506648|
|  Open|1469506651|
| Close|1469506653|
|  Open|1469506653|
|  Open|1469506656|
|  Open|1469506659|
|  Open|1469506659|
| Close|1469506660|
| Close|1469506660|
|  Open|1469506662|
| Close|1469506668|
| Close|1469506669|
|  Open|1469506670|
|  Open|1469506670|
+------+----------+
only showing top 20 rows



## Criando um banco de dados em separado e uma tabela Delta que irá receber os dados do Json em Streaming

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS db_stream;
USE db_stream;
DROP TABLE IF EXISTS db_stream.tab_stream;
CREATE TABLE db_stream.tab_stream(
action STRING,
time STRING
)
USING delta
LOCATION "/tmp/delta/events"

## Executando a carga na pasta do Delta Lake, onde serão armazenados os dados

In [0]:
%python
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Streaming reads and append into delta table (Start !)
read_schema = StructType([
StructField("action", StringType(), False),
StructField("time", StringType(), True)
])
df2 = (spark.readStream
.option("maxFilesPerTrigger", "1")
.schema(read_schema)
.json("/databricks-datasets/structured-streaming/events/"))
(df2.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/checkpoint")
.option("path", "/tmp/delta/events").start())

Out[3]: <pyspark.sql.streaming.StreamingQuery at 0x7f0ee40d1d30>

## Exibindo os dados em tempo real oriunda da tabela Delta

In [0]:
%sql select distinct action, count(*) from db_stream.tab_stream
group by action

action,count(1)
Open,33508
Close,32492


## Listando os históricos registrados na tabela Delta

In [0]:
%sql
DESCRIBE HISTORY '/tmp/delta/events'

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
46,2022-11-14T14:34:35.000+0000,6031105194152143,patrick_diorio@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 23591a0a-74ca-408c-be83-065d5fb4745c, epochId -> 45)",,List(1102759827964470),1114-104148-clvtqm98,45.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11711, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
45,2022-11-14T14:34:30.000+0000,6031105194152143,patrick_diorio@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 23591a0a-74ca-408c-be83-065d5fb4745c, epochId -> 44)",,List(1102759827964470),1114-104148-clvtqm98,44.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11731, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
44,2022-11-14T14:34:26.000+0000,6031105194152143,patrick_diorio@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 23591a0a-74ca-408c-be83-065d5fb4745c, epochId -> 43)",,List(1102759827964470),1114-104148-clvtqm98,43.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11445, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
43,2022-11-14T14:34:21.000+0000,6031105194152143,patrick_diorio@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 23591a0a-74ca-408c-be83-065d5fb4745c, epochId -> 42)",,List(1102759827964470),1114-104148-clvtqm98,42.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11634, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
42,2022-11-14T14:34:17.000+0000,6031105194152143,patrick_diorio@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 23591a0a-74ca-408c-be83-065d5fb4745c, epochId -> 41)",,List(1102759827964470),1114-104148-clvtqm98,41.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11823, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
41,2022-11-14T14:34:12.000+0000,6031105194152143,patrick_diorio@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 23591a0a-74ca-408c-be83-065d5fb4745c, epochId -> 40)",,List(1102759827964470),1114-104148-clvtqm98,40.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11538, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
40,2022-11-14T14:34:08.000+0000,6031105194152143,patrick_diorio@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 23591a0a-74ca-408c-be83-065d5fb4745c, epochId -> 39)",,List(1102759827964470),1114-104148-clvtqm98,39.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11753, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
39,2022-11-14T14:34:02.000+0000,6031105194152143,patrick_diorio@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 23591a0a-74ca-408c-be83-065d5fb4745c, epochId -> 38)",,List(1102759827964470),1114-104148-clvtqm98,38.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11766, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
38,2022-11-14T14:33:56.000+0000,6031105194152143,patrick_diorio@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 23591a0a-74ca-408c-be83-065d5fb4745c, epochId -> 37)",,List(1102759827964470),1114-104148-clvtqm98,37.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11806, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
37,2022-11-14T14:33:51.000+0000,6031105194152143,patrick_diorio@hotmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 23591a0a-74ca-408c-be83-065d5fb4745c, epochId -> 36)",,List(1102759827964470),1114-104148-clvtqm98,36.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11763, numAddedFiles -> 1)",,Databricks-Runtime/10.4.x-scala2.12
