In [0]:
#fazendo imports que precisa para rodar esse notebook
from pyspark.sql.types import StructType, StringType
from pyspark.sql.functions import *
import requests

In [0]:
#limpa a pasta onde irá pôr os arquivos .json 
dbutils.fs.rm("/mnt/tmp", recurse = True)

Out[2]: True

In [0]:
#Essa é uma API criada no mockapi.io, onde iremos pegar os dados para fazer o Streaming
api_url = "https://67c62439351c081993fc80d9.mockapi.io/v1"

# Caminho para armazenar os dados no Databricks (DBFS)
data_path = "/mnt/tmp/mockapi_data"

# Criar diretório, se não existir
dbutils.fs.mkdirs(data_path)

def fetch_and_save_data():
    """Busca os dados da API e salva em JSON no DBFS"""
    response = requests.get(api_url)
    
    if response.status_code == 200:#valida se a resposta do servidor é igual à "200"
        data = response.json()  # Converte para JSON
        if data:  # Evita salvar arquivos vazios
            df = spark.createDataFrame(data)#criando data frame
            df.write.mode("append").json(data_path)#escrevendo o arquivo no modo "append"(sempre add um arquivo mesmo que repetido) com o formato do arquivo em .json
            print(f"Dados salvos em {data_path}")#printa onde foi salvo os dados
    else:
        print(f"Erro ao buscar API: {response.status_code}")#mostra o status_code do erro

for i in range(10, 0, -1):
    print(f"Contagem: {i}")
    fetch_and_save_data()

print("Fim da contagem!!")

Contagem: 10
Dados salvos em /mnt/tmp/mockapi_data
Contagem: 9
Dados salvos em /mnt/tmp/mockapi_data
Contagem: 8
Dados salvos em /mnt/tmp/mockapi_data
Contagem: 7
Dados salvos em /mnt/tmp/mockapi_data
Contagem: 6
Dados salvos em /mnt/tmp/mockapi_data
Contagem: 5
Dados salvos em /mnt/tmp/mockapi_data
Contagem: 4
Dados salvos em /mnt/tmp/mockapi_data
Contagem: 3
Dados salvos em /mnt/tmp/mockapi_data
Contagem: 2
Dados salvos em /mnt/tmp/mockapi_data
Contagem: 1
Dados salvos em /mnt/tmp/mockapi_data
Fim da contagem!!


In [0]:
# Definindo o Schema do DataSource, ou seja, do dado que chega pela API.
#Adicionando a coliuna id e createdAt do tipo string
json_schema = StructType() \
    .add("id", StringType()) \
    .add("createdAt", StringType())

#definindo o schema e o local de salvamento no formato .json
df_streaming_input = spark.readStream.schema(json_schema).json(data_path)


# Realiza um agrupamento dos eventos (Open e Close) por minuto.
df_streaming_counts = (
    df_streaming_input.groupBy(#está usando o valor da variável "df_streaming_input" para fazer um agrupamento
                              df_streaming_input.id,#agrupando por "id"
                              window(df_streaming_input.createdAt, "1 minute")#agrupando por "createdAt" em uma janela de 1 minuto
                              )
                              .count()#faz um count de cada item
                       )

In [0]:
#Aqui criaremos uma query interativa, conforme a ingestão de um novo dataset ocorre, essa query é "atualizada".
query = (
  df_streaming_counts 
    .writeStream
    .format("memory")        # Armazenando as informações em memória.
    .queryName("contagem")   # Nome da tabela criada em memória.
    .outputMode("update")    # Modo Update de Output.
    .start()                 #da inicio na query
)

In [0]:
%sql
/*seleciona todas as colunas para ver quantas tem*/
select * from contagem

id,window,count
5,"List(2025-03-05T12:04:00.000+0000, 2025-03-05T12:05:00.000+0000)",10
49,"List(2025-03-04T21:36:00.000+0000, 2025-03-04T21:37:00.000+0000)",10
18,"List(2025-03-05T07:36:00.000+0000, 2025-03-05T07:37:00.000+0000)",10
13,"List(2025-03-04T23:14:00.000+0000, 2025-03-04T23:15:00.000+0000)",10
27,"List(2025-03-04T20:45:00.000+0000, 2025-03-04T20:46:00.000+0000)",10
44,"List(2025-03-05T16:20:00.000+0000, 2025-03-05T16:21:00.000+0000)",10
19,"List(2025-03-05T00:27:00.000+0000, 2025-03-05T00:28:00.000+0000)",10
43,"List(2025-03-05T11:40:00.000+0000, 2025-03-05T11:41:00.000+0000)",10
21,"List(2025-03-05T07:13:00.000+0000, 2025-03-05T07:14:00.000+0000)",10
20,"List(2025-03-05T12:30:00.000+0000, 2025-03-05T12:31:00.000+0000)",10


In [0]:
%sql 
/* Consultando os indicadores utilizando SQL, repare que ao longo do tempo, executando diversas vezes esta query faz com que a tabela apresente resultados diferentes ao longo do processo de ingestão. */

/*esse select vai trazer o id, timeEnd(com a data final), timeStart(com a data de start) e o count da tabela "contagem" em memoria */
select id, date_format(window.start, "MMM-dd HH:mm") as timeStart , date_format(window.end, "MMM-dd HH:mm") as timeEnd, count from contagem order by id, timeStart

id,timeStart,timeEnd,count
1,Mar-05 13:23,Mar-05 13:24,10
10,Mar-04 22:05,Mar-04 22:06,10
11,Mar-04 23:23,Mar-04 23:24,10
12,Mar-05 09:24,Mar-05 09:25,10
13,Mar-04 23:14,Mar-04 23:15,10
14,Mar-04 19:56,Mar-04 19:57,10
15,Mar-05 00:35,Mar-05 00:36,10
16,Mar-05 13:17,Mar-05 13:18,10
17,Mar-05 04:04,Mar-05 04:05,10
18,Mar-05 07:36,Mar-05 07:37,10
