In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import StructType, StructField, StringType, BinaryType, IntegerType, DoubleType, TimestampType, DateType, LongType
from delta.tables import DeltaTable
from pyspark.sql.utils import AnalysisException
from pyspark.storagelevel import StorageLevel
from typing import Union, Optional
from pyspark.sql.functions import input_file_name
from pyspark.sql.window import Window

# --- Credenciais AWS ---
accessKeyId = ""
secretAccessKey = ""

# --- Sessão Spark ---
def create_spark_session() -> SparkSession:
    spark = (
        SparkSession
        .builder
        .appName("Bronze Zone Streaming")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .enableHiveSupport()
        .getOrCreate()
    )
    
    spark.sparkContext.setLogLevel("WARN")

    conf = spark.sparkContext._jsc.hadoopConfiguration()
    conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
    conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    conf.set("fs.s3a.fast.upload", "true")
    conf.set("fs.s3a.bucket.all.committer.magic.enabled", "true")
    conf.set("fs.s3a.directory.marker.retention", "keep")
    conf.set("spark.driver.extraClassPath", "/usr/local/spark/jars/*")
    conf.set("spark.driver.memory", "8g")
    conf.set("spark.executor.memory", "16g")
    conf.set("fs.s3a.access.key", accessKeyId)
    conf.set("fs.s3a.secret.key", secretAccessKey)

    return spark

spark = create_spark_session()

In [4]:
schema_usuarios = StructType([
    StructField("id", LongType(), True),
    StructField("nome", StringType(), True),
    StructField("email", StringType(), True),
    StructField("timestamp", StringType(), True)
])

schema_musicas = StructType([
    StructField("id", LongType(), True),
    StructField("Artist", StringType(), True),
    StructField("Title", StringType(), True),
    StructField("timestamp", StringType(), True)
])

schema_streamings = StructType([
    StructField("id", LongType(), True),
    StructField("nome", StringType(), True),
    StructField("musica", StringType(), True),
    StructField("timestamp", StringType(), True)
])

In [5]:
landing_path = f"s3a://dev-lab-02-us-east-2-landing/spotify/"
bronze_path = f"s3a://dev-lab-02-us-east-2-bronze/spotify/"
checkpoint_path = f"s3a://dev-lab-02-us-east-2-bronze/checkpoints/spotify/"

# Explicação

## `readStream` — Leitura de Dados em "Tempo Real" no Spark

### 📌 O que é?

`readStream` é o método do PySpark para **ler dados em tempo real (streaming)**. Ele permite que sua aplicação Spark reaja automaticamente a **novos arquivos ou mensagens** que chegam a um diretório, Kafka, socket, entre outros.

---

### ✅ Quando usar?

* Quando sua aplicação precisa **processar dados continuamente** conforme eles chegam.
* Ideal para ingestão de dados para **pipelines de streaming**: landing → bronze → silver.
* Casos comuns:

  * Novos arquivos JSON chegando em uma pasta no S3.
  * Mensagens de um tópico Kafka.
  * Leitura contínua de logs, sensores ou eventos.

---

### 🔧 Exemplo básico com arquivos JSON

```python
from pyspark.sql.functions import input_file_name

df_stream = spark.readStream \
    .format("json") \
    .schema(schema) \
    .option("multiline", "true") \
    .load("s3a://meu-bucket/landing/usuarios/") \
    .withColumn("origem_arquivo", input_file_name())
```

---

### 🔎 Parâmetros úteis para `readStream`

| Parâmetro                     | Descrição                                                                    |
| ----------------------------- | ---------------------------------------------------------------------------- |
| `.format("...")`              | Fonte de dados: `json`, `csv`, `parquet`, `kafka`, `socket`, etc.            |
| `.schema(schema)`             | Define o schema dos dados esperados (obrigatório para arquivos estruturados) |
| `.option("multiline", "...")` | Se for `json` ou `csv`, define se os objetos estão em múltiplas linhas       |
| `.load(path)`                 | Caminho onde os dados estão chegando continuamente                           |
| `.withColumn(...)`            | Pode ser usado para adicionar colunas como `origem_arquivo`, `data`, etc.    |

---

### ⚠️ Atenção ao uso com arquivos

* O Spark **não reprocessa arquivos antigos** por padrão. Ele considera apenas **novos arquivos**.
* O diretório precisa ser **imutável**: evite sobrescrever arquivos no mesmo caminho.
* O schema precisa ser definido, pois o Spark não infere schema dinamicamente em streaming.

---

## 🧠 Diferença entre `read` vs `readStream`

| Característica        | `read` (batch)          | `readStream` (streaming)                       |
| --------------------- | ----------------------- | ---------------------------------------------- |
| Tipo de leitura       | Dados fixos (estáticos) | Dados em tempo real (dinâmicos)                |
| Quando é executado?   | Apenas uma vez          | Continuamente, enquanto o stream estiver ativo |
| Suporte a formatos    | Todos                   | Limitado: JSON, CSV, Parquet, Delta, Kafka     |
| Necessita checkpoint? | ❌ Não                   | ✅ Sim, para tolerância a falhas                |
| Schema dinâmico       | ✅ Sim                   | ❌ Não — precisa ser definido                   |

---

### 📚 Curiosidade: Spark Structured Streaming é *micro-batch*

Apesar de parecer tempo real, o Spark Structured Streaming opera internamente em **micro-batches**, ou seja, ele agrupa os dados em pequenos lotes com base no tempo (por padrão, a cada 500ms). Esse modelo equilibra performance e tolerância a falhas.

## `writeStream` com `.format("json" | "delta")`

### 📌 O que é?

O método `.writeStream` em PySpark é usado para gravar os dados de um **DataFrame em tempo real** em um destino como arquivos JSON, Delta, Kafka, etc.

### ✅ Quando usar?

* Quando você deseja **gravar continuamente** dados sem transformação complexa por micro-lote.
* Ideal para modos de saída simples como `append`, `complete` ou `update`.

---

### 🔧 Exemplo básico

```python
query = df_stream.writeStream \
    .format("json") \
    .option("path", "s3a://meu-bucket/saida/") \
    .option("checkpointLocation", "s3a://meu-bucket/checkpoint/") \
    .outputMode("append") \
    .start()
```

---

### 🔎 Parâmetros úteis para `.writeStream`

| Parâmetro                            | Descrição                                                                    |
| ------------------------------------ | ---------------------------------------------------------------------------- |
| `.format("...")`                     | Define o formato de saída: `json`, `parquet`, `delta`, `console`, etc.       |
| `.option("path", ...)`               | Caminho onde os dados serão salvos                                           |
| `.option("checkpointLocation", ...)` | Local para armazenar estado e falhas                                         |
| `.outputMode("...")`                 | Define o modo de saída: `append`, `complete`, `update`                       |
| `.trigger(...)`                      | Define o intervalo de execução (ex: `.trigger(processingTime="10 seconds")`) |
| `.start()`                           | Inicia o stream                                                              |
| `.awaitTermination()`                | Mantém o processo em execução                                                |

---

### ⚠️ Limitações

* Não permite **lógica personalizada por batch** (ex: upserts).
* O modo `append` só adiciona dados novos — não faz merge nem update.

## 📝 README 2: `writeStream` com `.foreachBatch(...)`

### 📌 O que é?

O método `.foreachBatch` permite **executar código customizado por micro-lote**. Isso é ideal para aplicar **transformações, joins, merges (upsert), validações**, etc.

### ✅ Quando usar?

* Quando você precisa de **lógica avançada** como:

  * Escrita em Delta com `MERGE` (upsert).
  * Enriquecimento de dados.
  * Gravação condicional ou múltiplos destinos.

---

### 🔧 Exemplo básico com UPSERT em Delta

```python
from delta.tables import DeltaTable

def upsert_to_delta(micro_batch_df, batch_id):
    delta_table = DeltaTable.forPath(spark, "s3a://bucket/saida/")
    delta_table.alias("t").merge(
        micro_batch_df.alias("s"),
        "t.id = s.id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()

query = df_stream.writeStream \
    .foreachBatch(upsert_to_delta) \
    .option("checkpointLocation", "s3a://bucket/checkpoint/") \
    .start()
```

---

### 🔎 Parâmetros úteis para `.foreachBatch`

| Parâmetro                            | Descrição                                         |
| ------------------------------------ | ------------------------------------------------- |
| `.foreachBatch(func)`                | Função que recebe `(df, batch_id)` por micro-lote |
| `.option("checkpointLocation", ...)` | Local onde o Spark salva o estado do stream       |
| `.trigger(...)`                      | Define o intervalo de micro-batches               |
| `.start()`                           | Inicia o stream                                   |
| `.awaitTermination()`                | Mantém a aplicação ativa                          |

---

### ✅ Vantagens

* Permite **gravação com lógica condicional** (ex: MERGE).
* Pode escrever em qualquer destino: Delta, JDBC, MongoDB, etc.
* Ideal para pipelines de ingestão **bronze → silver**.

### ⚠️ Cuidado

* Mais complexo que o `writeStream` padrão.
* A função `foreachBatch` roda em modo **batch dentro do streaming**, então **tem que ser eficiente**.

---

## 📚 Resumo Comparativo

| Critério               | `.writeStream` padrão   | `.foreachBatch`                            |
| ---------------------- | ----------------------- | ------------------------------------------ |
| Lógica por micro-batch | ❌ Não                   | ✅ Sim                                      |
| Suporte a UPSERT/MERGE | ❌ Não                   | ✅ Sim (com Delta Lake)                     |
| Destino suportado      | Limitado aos suportados | Qualquer destino (desde que código exista) |
| Complexidade           | 🔹 Baixa                | 🔸 Média/Alta                              |
| Tolerância a falhas    | ✅ Via checkpoint        | ✅ Via checkpoint                           |


# Código

In [11]:
def process_stream_json(landing_path, bronze_path, checkpoint_path, categoria, schema):
    df_stream = spark.readStream \
        .format("json") \
        .schema(schema) \
        .option("multiline","True") \
        .load(f"{landing_path}{categoria}") \
        .withColumn("origem_arquivo", input_file_name())

    query = df_stream.writeStream \
        .format("json") \
        .option("path", f"{bronze_path}{categoria}") \
        .option("checkpointLocation", f"{checkpoint_path}{categoria}") \
        .outputMode("append") \
        .start()

    return query

In [6]:
def upsert_to_delta(micro_batch_df, batch_id, output_path):
    if micro_batch_df.rdd.isEmpty():
        return

    
    window_spec = Window.partitionBy("id").orderBy(F.col("timestamp").desc())
    deduplicated_df = micro_batch_df.withColumn("rn", F.row_number().over(window_spec)) \
                                    .filter(F.col("rn") == 1) \
                                    .drop("rn")

    if DeltaTable.isDeltaTable(spark, output_path):
        delta_table = DeltaTable.forPath(spark, output_path)
        delta_table.alias("t").merge(
            deduplicated_df.alias("s"),
            "t.id = s.id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
    else:
        deduplicated_df.write.format("delta").mode("overwrite").save(output_path)

    if DeltaTable.isDeltaTable(spark, output_path):
        delta_table = DeltaTable.forPath(spark, output_path)
        delta_table.alias("t").merge(
            micro_batch_df.alias("s"),
            "t.id = s.id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
    else:
        micro_batch_df.write.format("delta").mode("overwrite").save(output_path)

def process_stream(landing_path, bronze_path, checkpoint_path, categoria, schema):
    input_path = f"{landing_path}{categoria}"
    output_path = f"{bronze_path}{categoria}"
    chk_path = f"{checkpoint_path}{categoria}"

    df_stream = spark.readStream \
        .format("json") \
        .schema(schema) \
        .option("multiline", "True") \
        .load(input_path) \
        .withColumn("origem_arquivo", input_file_name())

    query = df_stream.writeStream \
        .foreachBatch(lambda df, batch_id: upsert_to_delta(df, batch_id, output_path)) \
        .option("checkpointLocation", chk_path) \
        .start()

    return query


In [None]:
queries = [
    process_stream(landing_path, bronze_path, checkpoint_path,"usuarios", schema_usuarios),
    process_stream(landing_path, bronze_path, checkpoint_path,"musicas", schema_musicas),
    process_stream(landing_path, bronze_path, checkpoint_path,"streamings", schema_streamings)
]

for query in queries:
    query.awaitTermination()

In [17]:
df = spark.read.option("multiline","True").json(f"{landing_path}usuarios")
df.printSchema()

root
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- nome: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [18]:
df = spark.read.option("multiline","True").json(f"{landing_path}musicas")
df.printSchema()

root
 |-- Artist: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- id: long (nullable = true)
 |-- timestamp: string (nullable = true)



In [19]:
df = spark.read.option("multiline","True").json(f"{landing_path}streamings")
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- musica: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- timestamp: string (nullable = true)

