In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # LAB 2: STREAMING + DASHBOARDS (SERVERLESS COMPATÍVEL)
# MAGIC 
# MAGIC **100% compatível com Databricks Serverless**

# COMMAND ----------

import requests
import json
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

print("="*80)
print("LAB STREAMING + DASHBOARDS (SERVERLESS)".center(80))
print("="*80)

MOCKAPI_URL = "https://6914c81b3746c71fe049979d.mockapi.io/transacoes"

# COMMAND ----------

# MAGIC %md
# MAGIC ## PARTE 1: COLETAR DADOS DA API

# COMMAND ----------

print("COLETANDO DADOS DA API...\n")

todos_dados = []

# Buscar API 3 vezes
for i in range(3):
    print(f"Busca {i+1}/3...")
    try:
        response = requests.get(MOCKAPI_URL)
        if response.status_code == 200:
            dados = response.json()
            
            # Limpar dados
            for d in dados:
                cat = d.get('categoria', [])
                if isinstance(cat, list):
                    cat = cat[0] if len(cat) > 0 else 'retail'
                
                loc = d.get('localizacao', [])
                if isinstance(loc, list):
                    loc = loc[0] if len(loc) > 0 else 'SP'
                
                todos_dados.append({
                    "id": str(d.get("id", "")) + f"_lote{i}",
                    "timestamp": str(d.get("timestamp", "")),
                    "valor": float(d.get("valor", 0)),
                    "comerciante": str(d.get("comerciante", "")),
                    "categoria": str(cat),
                    "localizacao": str(loc),
                    "suspeita": bool(d.get("suspeita", False))
                })
            
            print(f"  ✓ Coletados {len(dados)} registros")
        else:
            print(f"  ✗ Erro: {response.status_code}")
    except Exception as e:
        print(f"  ✗ Erro: {e}")
    
    time.sleep(2)

print(f"\nTotal coletado: {len(todos_dados)} transações")

# COMMAND ----------

# MAGIC %md
# MAGIC ## PARTE 2: PROCESSAR DADOS

# COMMAND ----------

print("PROCESSANDO DADOS...\n")

# Criar DataFrame
df = spark.createDataFrame(todos_dados)

# Transformações (simulando o que streaming faria)
df_proc = df \
    .withColumn("data_proc", current_timestamp()) \
    .withColumn("risco",
        when(col("valor") > 2000, "ALTO")
        .when(col("valor") > 1000, "MEDIO")
        .otherwise("BAIXO")
    ) \
    .withColumn("alerta",
        when(col("suspeita") == True, "REVISAR")
        .otherwise("OK")
    )

# Salvar tabela
df_proc.write.format("delta").mode("overwrite").saveAsTable("stream_fraude_realtime")

print(f"✓ Tabela criada: stream_fraude_realtime")
print(f"✓ Registros: {df_proc.count()}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## DASHBOARD: VISÃO GERAL

# COMMAND ----------

print("="*80)
print("DASHBOARD TEMPO REAL".center(80))
print("="*80 + "\n")

df_dash = spark.table("stream_fraude_realtime")

total = df_dash.count()
suspeitas = df_dash.filter(col("suspeita") == True).count()
legitimas = total - suspeitas

# Calcular valor total com tratamento
try:
    valor_total = df_dash.select(sum("valor")).collect()[0][0]
    if valor_total is None:
        valor_total = 0.0
except:
    valor_total = 0.0

print(f"📊 RESUMO GERAL:\n")
print(f"   Total: {total}")
print(f"   Legítimas: {legitimas} ({legitimas/total*100:.1f}%)")
print(f"   Suspeitas: {suspeitas} ({suspeitas/total*100:.1f}%)")
print(f"   Valor Total: R$ {valor_total:,.2f}")
print(f"   Ticket Médio: R$ {valor_total/total:,.2f}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## GRÁFICO 1: POR LOCALIZAÇÃO

# COMMAND ----------

print("📍 DISTRIBUIÇÃO POR LOCALIZAÇÃO\n")

df_loc = spark.table("stream_fraude_realtime") \
    .groupBy("localizacao") \
    .agg(
        count("*").alias("total"),
        sum(when(col("suspeita") == True, 1).otherwise(0)).alias("suspeitas"),
        round(sum("valor"), 2).alias("valor_total")
    ) \
    .orderBy("total", ascending=False)

display(df_loc)

# COMMAND ----------

# MAGIC %md
# MAGIC ## GRÁFICO 2: POR CATEGORIA

# COMMAND ----------

print("🏪 DISTRIBUIÇÃO POR CATEGORIA\n")

df_cat = spark.table("stream_fraude_realtime") \
    .groupBy("categoria") \
    .agg(
        count("*").alias("total"),
        sum(when(col("suspeita") == True, 1).otherwise(0)).alias("suspeitas"),
        round(avg("valor"), 2).alias("valor_medio")
    ) \
    .orderBy("total", ascending=False)

display(df_cat)

# COMMAND ----------

# MAGIC %md
# MAGIC ## GRÁFICO 3: POR NÍVEL DE RISCO

# COMMAND ----------

print("⚠️ DISTRIBUIÇÃO POR RISCO\n")

df_risco = spark.table("stream_fraude_realtime") \
    .groupBy("risco") \
    .agg(
        count("*").alias("total"),
        sum(when(col("suspeita") == True, 1).otherwise(0)).alias("suspeitas"),
        round(avg("valor"), 2).alias("valor_medio")
    ) \
    .orderBy("risco")

display(df_risco)

# COMMAND ----------

# MAGIC %md
# MAGIC ## GRÁFICO 4: TOP 20 SUSPEITAS

# COMMAND ----------

print("🚨 TOP 20 TRANSAÇÕES SUSPEITAS\n")

df_top = spark.table("stream_fraude_realtime") \
    .filter(col("suspeita") == True) \
    .select("id", "valor", "comerciante", "categoria", "localizacao", "risco") \
    .orderBy(col("valor").desc()) \
    .limit(20)

display(df_top)

# COMMAND ----------

# MAGIC %md
# MAGIC ## GRÁFICO 5: TIMELINE

# COMMAND ----------

print("⏱ TIMELINE DE PROCESSAMENTO\n")

df_timeline = spark.table("stream_fraude_realtime") \
    .select("id", "valor", "categoria", "localizacao", "suspeita", "risco") \
    .limit(50)

display(df_timeline)

# COMMAND ----------

# MAGIC %md
# MAGIC ## GRÁFICO 6: HEATMAP FRAUDES

# COMMAND ----------

print("🔥 HEATMAP: CATEGORIA x LOCALIZAÇÃO\n")

df_heat = spark.table("stream_fraude_realtime") \
    .groupBy("categoria", "localizacao") \
    .agg(
        count("*").alias("total"),
        sum(when(col("suspeita") == True, 1).otherwise(0)).alias("fraudes")
    ) \
    .orderBy("fraudes", ascending=False)

display(df_heat)

# COMMAND ----------

# MAGIC %md
# MAGIC ## ANÁLISE DETALHADA POR REGIÃO

# COMMAND ----------

print("="*80)
print("ANÁLISE POR REGIÃO".center(80))
print("="*80 + "\n")

df_analise = spark.table("stream_fraude_realtime")

# SEM RDD - Compatível com Serverless
# Usar collect() direto e extrair valores
locs_rows = df_analise.select("localizacao").distinct().collect()
locs = [row.localizacao for row in locs_rows]

for loc in sorted(locs):
    df_loc_detail = df_analise.filter(col("localizacao") == loc)
    
    total_loc = df_loc_detail.count()
    susp_loc = df_loc_detail.filter(col("suspeita") == True).count()
    
    try:
        valor_loc = df_loc_detail.select(sum("valor")).collect()[0][0]
        if valor_loc is None:
            valor_loc = 0.0
    except:
        valor_loc = 0.0
    
    print(f"📍 {loc}:")
    print(f"   Total: {total_loc}")
    print(f"   Suspeitas: {susp_loc} ({susp_loc/total_loc*100:.1f}%)")
    print(f"   Valor: R$ {valor_loc:,.2f}\n")

# COMMAND ----------

# MAGIC %md
# MAGIC ## QUERIES SQL EXEMPLO

# COMMAND ----------

print("""
================================================================================
QUERIES SQL DISPONÍVEIS
================================================================================

SELECT * FROM stream_fraude_realtime

SELECT * FROM stream_fraude_realtime WHERE suspeita = true

SELECT localizacao, COUNT(*) as total, 
       SUM(CASE WHEN suspeita THEN 1 ELSE 0 END) as fraudes
FROM stream_fraude_realtime
GROUP BY localizacao

SELECT categoria, AVG(valor) as valor_medio
FROM stream_fraude_realtime
GROUP BY categoria
ORDER BY valor_medio DESC

================================================================================
""")

# COMMAND ----------

# MAGIC %sql
# MAGIC SELECT * FROM stream_fraude_realtime LIMIT 20

# COMMAND ----------

# MAGIC %sql
# MAGIC SELECT 
#     localizacao,
#     COUNT(*) as total,
#     SUM(CASE WHEN suspeita THEN 1 ELSE 0 END) as fraudes,
#     ROUND(AVG(valor), 2) as valor_medio
# MAGIC FROM stream_fraude_realtime
# MAGIC GROUP BY localizacao
# MAGIC ORDER BY fraudes DESC

# COMMAND ----------

# MAGIC %sql
# MAGIC SELECT 
#     categoria,
#     COUNT(*) as total,
#     SUM(CASE WHEN suspeita THEN 1 ELSE 0 END) as fraudes,
#     ROUND(AVG(valor), 2) as valor_medio
# MAGIC FROM stream_fraude_realtime
# MAGIC GROUP BY categoria
# MAGIC ORDER BY total DESC

# COMMAND ----------

# MAGIC %sql
# MAGIC SELECT 
#     risco,
#     COUNT(*) as total,
#     SUM(CASE WHEN suspeita THEN 1 ELSE 0 END) as fraudes,
#     ROUND(AVG(valor), 2) as valor_medio
# MAGIC FROM stream_fraude_realtime
# MAGIC GROUP BY risco
# MAGIC ORDER BY 
#     CASE 
#         WHEN risco = 'BAIXO' THEN 1
#         WHEN risco = 'MEDIO' THEN 2
#         WHEN risco = 'ALTO' THEN 3
#     END

# COMMAND ----------

# MAGIC %md
# MAGIC ## EXPLICAÇÃO COMPLETA

# COMMAND ----------

print("""
================================================================================
                    LAB STREAMING + DASHBOARDS
================================================================================

ESTE LAB DEMONSTROU:

✓ Ingestão de API externa (MockAPI)
✓ Processamento de dados
✓ Classificação de risco
✓ Detecção de fraudes
✓ Armazenamento em Delta Lake
✓ 6 dashboards visuais
✓ Queries SQL disponíveis

================================================================================

COMPATIBILIDADE SERVERLESS:

Este notebook é 100% compatível com Databricks Serverless:

✓ SEM uso de RDD (.rdd)
✓ SEM código PySpark customizado não suportado
✓ Apenas funções nativas Spark SQL
✓ DataFrame API padrão

EVITAMOS:
  ✗ .rdd.flatMap()
  ✗ .rdd.map()
  ✗ Código Python customizado em RDD

USAMOS:
  ✓ .collect() + list comprehension
  ✓ Funções SQL nativas
  ✓ DataFrame transformations

================================================================================

ARQUITETURA IMPLEMENTADA:

API MockAPI
    ↓ (requests)
Coleta de Dados (Python)
    ↓ (createDataFrame)
Spark DataFrame
    ↓ (transformações SQL)
Processamento
    ↓ (write.saveAsTable)
Delta Lake
    ↓ (display/SQL)
Dashboards

================================================================================

BATCH vs STREAMING:

ESTE LAB (Batch):
  - Coleta todos dados
  - Processa de uma vez
  - Simples e confiável
  - Adequado para 80% dos casos

STREAMING REAL (Produção):
  - spark.readStream
  - spark.writeStream
  - Processa continuamente
  - Requer cluster dedicado

QUANDO USAR:

Batch:
  ✓ Relatórios periódicos
  ✓ Análises históricas
  ✓ ETL tradicional
  ✓ Budget limitado

Streaming:
  ✓ Fraude tempo real (<1s)
  ✓ IoT (milhões eventos/s)
  ✓ Alertas críticos
  ✓ Trading financeiro

================================================================================

CONCEITOS PARA MBA:

1. INTEGRAÇÃO DE DADOS:
   - APIs externas
   - Formatos JSON
   - Limpeza de dados

2. PROCESSAMENTO:
   - Transformações
   - Classificações
   - Agregações

3. ARMAZENAMENTO:
   - Delta Lake
   - Formato otimizado
   - Query performático

4. VISUALIZAÇÃO:
   - Dashboards múltiplos
   - Diferentes dimensões
   - Insights visuais

5. DECISÃO ARQUITETURAL:
   - Batch vs Streaming
   - Trade-offs
   - Custos vs benefícios

================================================================================

DASHBOARDS CRIADOS:

1. 📊 Resumo Geral
   - Total transações
   - Fraudes detectadas
   - Valores totais

2. 📍 Por Localização
   - Distribuição geográfica
   - Fraudes por estado

3. 🏪 Por Categoria
   - Tipos de comércio
   - Padrões por setor

4. ⚠️ Por Risco
   - BAIXO/MÉDIO/ALTO
   - Distribuição

5. 🚨 Top Suspeitas
   - Maiores valores
   - Alta prioridade

6. 🔥 Heatmap
   - Categoria x Localização
   - Padrões cruzados

================================================================================

QUERIES SQL:

✓ Views simples
✓ Agregações
✓ Filtros
✓ Ordenações
✓ Análises multidimensionais

================================================================================

RESULTADO FINAL:

Tabela: stream_fraude_realtime
Registros: ~150 transações
Dashboards: 6 visualizações
SQL: 4 queries exemplo
Tempo: 2-3 minutos

PRONTO PARA ANÁLISE E TOMADA DE DECISÃO! 📊

================================================================================

LAB CONCLUÍDO COM SUCESSO! ✅

Você tem um pipeline completo de análise de fraude:
  ✓ Dados reais (API)
  ✓ Processamento (Spark)
  ✓ Armazenamento (Delta)
  ✓ Visualização (Dashboards)
  ✓ Análise (SQL)

100% compatível com Databricks Serverless!

================================================================================
""")

# COMMAND ----------

                    LAB STREAMING + DASHBOARDS (SERVERLESS)                     
COLETANDO DADOS DA API...

Busca 1/3...
  ✓ Coletados 100 registros
Busca 2/3...
  ✓ Coletados 100 registros
Busca 3/3...
  ✓ Coletados 100 registros

Total coletado: 300 transações
PROCESSANDO DADOS...

✓ Tabela criada: stream_fraude_realtime
✓ Registros: 300
                              DASHBOARD TEMPO REAL                              

📊 RESUMO GERAL:

   Total: 300
   Legítimas: 237 (79.0%)
   Suspeitas: 63 (21.0%)
   Valor Total: R$ 437,932.80
   Ticket Médio: R$ 1,459.78
📍 DISTRIBUIÇÃO POR LOCALIZAÇÃO



localizacao,total,suspeitas,valor_total
SP,36,9,47805.9
BA,33,6,59987.7
PE,33,3,25578.6
MG,33,3,27846.9
RJ,33,18,69132.9
PR,33,3,24206.1
SC,33,21,108835.2
RS,33,0,57975.9
DF,33,0,16563.6


🏪 DISTRIBUIÇÃO POR CATEGORIA



categoria,total,suspeitas,valor_medio
online,57,51,3951.94
retail,39,0,422.82
travel,36,12,3093.37
restaurant,36,0,115.87
electronics,33,0,1474.49
gas,33,0,86.73
supermarket,33,0,550.39
pharmacy,33,0,332.32


⚠️ DISTRIBUIÇÃO POR RISCO



risco,total,suspeitas,valor_medio
ALTO,93,63,3619.59
BAIXO,174,0,283.58
MEDIO,33,0,1574.77


🚨 TOP 20 TRANSAÇÕES SUSPEITAS



id,valor,comerciante,categoria,localizacao,risco
98_lote0,4890.5,Zattini,online,SC,ALTO
98_lote1,4890.5,Zattini,online,SC,ALTO
98_lote2,4890.5,Zattini,online,SC,ALTO
53_lote0,4890.0,Shein Brasil,online,SC,ALTO
33_lote0,4890.0,Dell Online,online,BA,ALTO
33_lote2,4890.0,Dell Online,online,BA,ALTO
33_lote1,4890.0,Dell Online,online,BA,ALTO
53_lote1,4890.0,Shein Brasil,online,SC,ALTO
53_lote2,4890.0,Shein Brasil,online,SC,ALTO
80_lote1,4780.0,Privalia,online,SC,ALTO


⏱ TIMELINE DE PROCESSAMENTO



id,valor,categoria,localizacao,suspeita,risco
1_lote0,1250.8,electronics,SP,False,MEDIO
2_lote0,45.3,restaurant,RJ,False,BAIXO
3_lote0,2890.0,online,MG,True,ALTO
4_lote0,78.9,gas,RS,False,BAIXO
5_lote0,156.4,supermarket,PR,False,BAIXO
6_lote0,890.5,retail,BA,False,BAIXO
7_lote0,3450.0,travel,PE,True,ALTO
8_lote0,234.8,pharmacy,SC,False,BAIXO
9_lote0,67.2,restaurant,DF,False,BAIXO
10_lote0,4250.9,online,SP,True,ALTO


🔥 HEATMAP: CATEGORIA x LOCALIZAÇÃO



categoria,localizacao,total,fraudes
online,SC,21,21
online,RJ,18,18
online,SP,6,6
travel,PR,6,3
travel,PE,3,3
travel,SP,3,3
travel,BA,3,3
online,BA,3,3
online,MG,6,3
retail,RS,3,0


                               ANÁLISE POR REGIÃO                               

📍 BA:
   Total: 33
   Suspeitas: 6 (18.2%)
   Valor: R$ 59,987.70

📍 DF:
   Total: 33
   Suspeitas: 0 (0.0%)
   Valor: R$ 16,563.60

📍 MG:
   Total: 33
   Suspeitas: 3 (9.1%)
   Valor: R$ 27,846.90

📍 PE:
   Total: 33
   Suspeitas: 3 (9.1%)
   Valor: R$ 25,578.60

📍 PR:
   Total: 33
   Suspeitas: 3 (9.1%)
   Valor: R$ 24,206.10

📍 RJ:
   Total: 33
   Suspeitas: 18 (54.5%)
   Valor: R$ 69,132.90

📍 RS:
   Total: 33
   Suspeitas: 0 (0.0%)
   Valor: R$ 57,975.90

📍 SC:
   Total: 33
   Suspeitas: 21 (63.6%)
   Valor: R$ 108,835.20

📍 SP:
   Total: 36
   Suspeitas: 9 (25.0%)
   Valor: R$ 47,805.90


QUERIES SQL DISPONÍVEIS

SELECT * FROM stream_fraude_realtime

SELECT * FROM stream_fraude_realtime WHERE suspeita = true

SELECT localizacao, COUNT(*) as total, 
       SUM(CASE WHEN suspeita THEN 1 ELSE 0 END) as fraudes
FROM stream_fraude_realtime
GROUP BY localizacao

SELECT categoria, AVG(valor) as valor_medio