In [0]:
!pip install faker


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.types import *
import uuid
import random
from faker import Faker
from datetime import datetime, timedelta
import pandas as pd

# Setup do Faker
faker = Faker()

# Caminho de saída para bronze (pode ser DBFS ou montado)
caminho_bronze = "/Volumes/comunicacao/comunicacao/bronze/stream_dados.parquet"

# Define o schema correspondente ao seu CSV
schema = StructType([
    StructField("Nome", StringType(), True),
    StructField("Endereço", StringType(), True),
    StructField("IP", StringType(), True),
    StructField("Hora da conexão", StringType(), True),
    StructField("Dispositivo de acesso", StringType(), True),
    StructField("Velocidade de conexão", StringType(), True),
    StructField("Status de conexão", StringType(), True),
    StructField("Rescued data", StringType(), True),
    StructField("Rastreamento", StringType(), True),
    StructField("Sources", StringType(), True),
    StructField("Fonte", StringType(), True),
    StructField("Data", StringType(), True),
    StructField("Arquivo em gestão", StringType(), True),
    StructField("Status", StringType(), True)
])

# Função para gerar um lote de dados (como Pandas DataFrame)
def gerar_lote(qtd=10):
    registros = []
    for _ in range(qtd):
        inicio = faker.date_time_between(start_date='-7d', end_date='now')
        fim = inicio + timedelta(minutes=random.randint(10, 60))

        registros.append({
            "Nome": faker.name(),
            "Endereço": faker.address().replace('\n', ', '),
            "IP": faker.ipv4(),
            "Hora da conexão": f"{inicio.strftime('%Y-%m-%d %H:%M:%S')} - {fim.strftime('%Y-%m-%d %H:%M:%S')}",
            "Dispositivo de acesso": random.choice(["Celular", "Computador", "Tablet"]),
            "Velocidade de conexão": f"{round(random.uniform(5.0, 500.0), 2)} Mbps",
            "Status de conexão": random.choice(["Conectado", "Desconectado"]),
            "Rescued data": str(uuid.uuid4()),
            "Rastreamento": str(uuid.uuid4()),
            "Sources": random.choice(["Web Gateway", "Firewall", "Router Log"]),
            "Fonte": faker.domain_name(),
            "Data": datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
            "Arquivo em gestão": random.choice(["save.log", "top.log", "difference.log"]),
            "Status": random.choice(["True", "False"])
        })
    return registros

# Cria um DataFrame estático e transforma em streaming simulado
def gerar_streaming():
    dados_iniciais = gerar_lote(100)
    df_inicial = spark.createDataFrame(pd.DataFrame(dados_iniciais), schema=schema)
    df_stream = df_inicial.writeStream \
        .format("memory") \
        .queryName("simulacao_memoria") \
        .outputMode("append") \
        .start()
    return df_stream

# Função para criar dados e salvar em parquet com foreachBatch
def gerar_continuamente(intervalo_segundos=5):
    from pyspark.sql.functions import lit
    from pyspark.sql.streaming import DataStreamWriter

    def gerar_microbatch(_):
        dados = gerar_lote(10)
        df_batch = spark.createDataFrame(pd.DataFrame(dados), schema=schema)
        df_batch.write.mode("append").parquet(caminho_bronze)

    from threading import Timer
    def ciclo():
        gerar_microbatch(None)
        Timer(intervalo_segundos, ciclo).start()

    ciclo()

# ---- EXECUÇÃO ----
gerar_continuamente(intervalo_segundos=5)  # Gera dados a cada 5 segundos
