<a href="https://colab.research.google.com/github/rebecadieb/ev-telemetry-simulator-spark-pipeline-colab-kafka-mongo/blob/main/EV_Telemetry_Simulator_%26_Spark_Pipeline_(Colab_Kafka_Mongo).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Simulador de Telemetria de Frota de Carros Elétricos**

O código abaixo gera sinais (telemetria) de uma frota de veículos elétricos, permitindo testes e prototipação de pipelines de ingestão e análise de dados de telemetria.

Os dados gerados simulam condições quase realistas de veículos em movimento ou em carga.

---

## **Objetivos**

- Criar dados sintéticos de telemetria EV para testes de pipelines e modelos.
- Suportar múltiplos modos de saída: `stdout`, `arquivo JSON`, `HTTP POST`, `Kafka`.
- Permitir controle de parâmetros (nº de veículos, frequência, duração, semente aleatória).
- Gerar informações de mecânica, energia, localização e eventos de falha.

---

## **Estrutura do Output**

Cada registro é um JSON no seguinte formato:

```json
{
  "schema_version": "1.0.0",
  "vehicle_id": "EV-00001",
  "timestamp": "2025-08-24T18:05:30.123456+00:00",
  "metrics": {
    "soc_pct": 85.3,
    "pack_voltage_v": 380.5,
    "pack_current_a": -42.7,
    "power_kw": 16.4,
    "battery_temp_c": 29.1,
    "motor_temp_c": 47.2,
    "coolant_temp_c": 32.0,
    "tyre_pressure_kpa": [225.1, 224.7, 223.5, 226.3],
    "speed_kph": 72.4,
    "odometer_km": 50321.8,
    "latitude": -23.5505,
    "longitude": -46.6333,
    "heading_deg": 145.3,
    "ambient_temp_c": 27.5,
    "is_charging": false,
    "charge_power_kw": 0.0,
    "health_score": 0.95
  },
  "events": {
    "fault_code": null
  }
}


# 📖 Dicionário de Dados – Simulador de Telemetria EV

Cada linha gerada pelo simulador corresponde a um evento de telemetria de um veículo elétrico (EV).  
O formato de saída é **JSON** (pode ser convertido para CSV/Parquet).

## Raiz

| Campo             | Tipo      | Obrigatório | Formato / Unidade         | Descrição |
|-------------------|-----------|-------------|---------------------------|-----------|
| **schema_version** | string   | Sim         | Texto (ex.: `"1.0.0"`)    | Versão do schema da mensagem para garantir compatibilidade futura. |
| **vehicle_id**     | string   | Sim         | `EV-` + 5 dígitos | Identificador único do veículo na frota. |
| **timestamp**      | datetime | Sim         | ISO 8601 UTC (`YYYY-MM-DDThh:mm:ss.ssssss+00:00`) | Momento exato de geração do evento de telemetria. |
| **metrics**        | objeto   | Sim         | Estrutura JSON aninhada   | Grupo de métricas contínuas (sinais vitais e sensores do veículo). |
| **events**         | objeto   | Sim         | Estrutura JSON aninhada   | Grupo de eventos discretos (falhas e alertas do veículo). |


## Metrics + Events

| Campo              | Tipo           | Unidade / Formato      | Obrigatório | Domínio / Valores válidos | Descrição |
|--------------------|----------------|------------------------|-------------|---------------------------|-----------|
| **soc_pct**        | float          | %                      | Sim         | 0 – 100                   | State of Charge: nível de carga da bateria. |
| **pack_voltage_v** | float          | Volts (V)              | Sim         | 200 – 900                 | Tensão do pack de baterias. |
| **pack_current_a** | float          | Amperes (A)            | Sim         | -2000 – 2000              | Corrente elétrica do pack (negativa em carga, positiva em descarga). |
| **power_kw**       | float          | Quilowatts (kW)        | Sim         | -300 – 300                | Potência instantânea consumida ou fornecida. |
| **battery_temp_c** | float          | Graus Celsius (°C)     | Sim         | -30 – 90                  | Temperatura média do pack de baterias. |
| **motor_temp_c**   | float          | Graus Celsius (°C)     | Sim         | -30 – 150                 | Temperatura do motor elétrico. |
| **coolant_temp_c** | float          | Graus Celsius (°C)     | Sim         | -30 – 120                 | Temperatura do fluido de arrefecimento. |
| **tyre_pressure_kpa** | array[float] | Quilopascal (kPa)      | Sim         | 120 – 260                 | Pressão dos quatro pneus (ordem: dianteiro-esq., dianteiro-dir., traseiro-esq., traseiro-dir.). |
| **speed_kph**      | float          | km/h                   | Sim         | 0 – 250                   | Velocidade atual do veículo. |
| **odometer_km**    | float          | km                     | Sim         | 0 – 2.000.000             | Quilometragem acumulada do veículo (odômetro). |
| **latitude**       | float          | graus decimais         | Sim         | -90 – 90                  | Latitude aproximada (GPS). |
| **longitude**      | float          | graus decimais         | Sim         | -180 – 180                | Longitude aproximada (GPS). |
| **heading_deg**    | float          | graus (0–360)          | Sim         | 0 – 360                   | Direção/rumo de movimento em relação ao norte. |
| **ambient_temp_c** | float          | Graus Celsius (°C)     | Sim         | -50 – 80                  | Temperatura ambiente externa. |
| **is_charging**    | boolean        | true / false           | Sim         | `true` ou `false`         | Indica se o veículo está em processo de recarga. |
| **charge_power_kw**| float          | Quilowatts (kW)        | Sim         | 0 – 400                   | Potência de carga aplicada (0 se não estiver carregando). |
| **health_score**   | float          | Escala 0–1             | Sim         | 0 – 1                     | Índice de saúde da bateria (1=ótima, 0.5=degradada). |
| **fault_code**     | string / null  | Texto ou `null`        | Não         | `null` (sem falha) ou:<br>• `BMS_WARN_TEMP`<br>• `TIRE_PRESS_LOW`<br>• `COOLANT_LEVEL_LOW`<br>• `BRAKE_PAD_WEAR` | Código de falha ativa do veículo (eventos discretos). |

# Instalação e checagem do MongoDB

In [1]:
# libs Python
!pip -q install pymongo

# pastas
!mkdir -p /content/mongodb/data /content/mongodb/log

In [2]:
%%bash
set -euo pipefail
mkdir -p /content/mongodb/data /content/mongodb/log

urls=(
  "https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-ubuntu2204-7.0.14.tgz"
  "https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-ubuntu2204-7.0.13.tgz"
  "https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-ubuntu2204-7.0.12.tgz"
  "https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-ubuntu2204-6.0.18.tgz"
  "https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-ubuntu2004-6.0.18.tgz"
)
ok=""
for u in "${urls[@]}"; do
  echo ">> tentando $u"
  if wget -q "$u" -O /content/mongodb.tgz; then
    tar -xzf /content/mongodb.tgz -C /content/
    ok="$(find /content -maxdepth 1 -type d -name 'mongodb-linux-*' | head -n1)"
    break
  fi
done
if [ -z "$ok" ]; then
  echo "Falhei em baixar binário do MongoDB." >&2; exit 1
fi

ln -sfn "$ok/bin/mongod" /content/mongodb/mongod
ln -sfn "$ok/bin/mongo"  /content/mongodb/mongo

nohup /content/mongodb/mongod \
  --dbpath /content/mongodb/data \
  --bind_ip 127.0.0.1 --port 27017 \
  --logpath /content/mongodb/log/mongod.log \
  --wiredTigerCacheSizeGB 0.25 >/dev/null 2>&1 &

sleep 2
pgrep -a mongod || { echo "mongod NÃO está rodando"; tail -n 50 /content/mongodb/log/mongod.log; exit 1; }
echo "✅ mongod ativo em 127.0.0.1:27017"

>> tentando https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-ubuntu2204-7.0.14.tgz
19448 /content/mongodb/mongod --dbpath /content/mongodb/data --bind_ip 127.0.0.1 --port 27017 --logpath /content/mongodb/log/mongod.log --wiredTigerCacheSizeGB 0.25
✅ mongod ativo em 127.0.0.1:27017


In [3]:
from pymongo import MongoClient
cli = MongoClient("mongodb://127.0.0.1:27017", serverSelectionTimeoutMS=3000)
cli.admin.command("ping")
cli.close()
print("✅ Mongo OK no 127.0.0.1:27017")

✅ Mongo OK no 127.0.0.1:27017


In [4]:
!pip install confluent-kafka



# Classe para simulação

In [5]:
try:
    from pymongo import MongoClient
    _HAS_PYMONGO = True
except Exception:
    _HAS_PYMONGO = False

class MongoAlertStore:
    def __init__(self, uri: str = "mongodb://127.0.0.1:27017",
                 db: str = "telemetria",
                 collection: str = "ev_alerts",
                 ttl_days: int | None = 30):
        if not _HAS_PYMONGO:
            raise RuntimeError("pymongo não está instalado. Rode: !pip install pymongo")
        # conecta e valida
        self.client = MongoClient(uri, serverSelectionTimeoutMS=5000)
        self.client.admin.command("ping")
        self.col = self.client[db][collection]
        # índices úteis
        self.col.create_index([("vehicle_id", 1), ("ts_dt", -1)])
        if ttl_days is not None and ttl_days > 0:
            # TTL precisa de datetime 'naive' (UTC) num campo
            self.col.create_index("ts_dt", expireAfterSeconds=int(ttl_days * 86400))

    def send(self, rec: dict):
        m = rec["metrics"]; e = rec.get("events") or {}
        tsdt = datetime.fromisoformat(rec["timestamp"])
        # normaliza pra 'naive' UTC (compatível com TTL)
        if tsdt.tzinfo:
            tsdt = tsdt.astimezone(timezone.utc).replace(tzinfo=None)
        doc = {
            "ts": rec["timestamp"],
            "ts_dt": tsdt,
            "vehicle_id": rec["vehicle_id"],
            "soc_pct": m["soc_pct"],
            "power_kw": m["power_kw"],
            "latitude": m["latitude"],
            "longitude": m["longitude"],
            "fault_code": e.get("fault_code"),
            "payload": rec,  # guarda evento completo pra reprocesso
        }
        self.col.insert_one(doc)

    def close(self):
        try:
            self.client.close()
        except:
            pass

In [6]:
# Simulador de Frota de Carros Elétricos
# Saídas: stdout | file(.jsonl) | http | kafka

import json, math, random, time, os, sys, re

from datetime import datetime, timezone
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
from google.colab import files
from typing import Dict, List, Tuple
from datetime import datetime, timezone
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window


SCHEMA_VERSION = "1.0.0"

def _sample_soc_biased(r):
    # 97% perto de 100, 3% bem baixos
    if r.random() < 0.03:
        x = r.betavariate(1.5, 8.0)   # puxa para 0
    else:
        x = r.betavariate(7.0, 2.0)   # puxa para 1
    return clamp(100.0*x, 0.0, 100.0)

def iso_now():
    return datetime.now(timezone.utc).isoformat()

def clamp(v, lo, hi):
    return max(lo, min(hi, v))

class EV:
    def __init__(self, vid, base_lat=-23.5505, base_lon=-46.6333, seed=None):
        r = random.Random(seed if seed is not None else random.randrange(1<<30))
        self.r = r
        self.vehicle_id = f"EV-{vid:05d}"
        self.soc = _sample_soc_biased(r)
        self.pack_capacity_kwh = r.uniform(55, 90)    # kWh
        self.voltage_nominal = r.uniform(320, 400)    # V
        self.battery_temp = r.uniform(18, 30)         # °C
        self.motor_temp = r.uniform(20, 35)           # °C
        self.coolant_temp = r.uniform(18, 28)         # °C
        self.tyre_pressure_kpa = [r.uniform(210, 240) for _ in range(4)]
        self.speed_kph = r.uniform(0, 30)
        self.heading_deg = r.uniform(0, 360)
        self.lat = base_lat + r.uniform(-0.15, 0.15)
        self.lon = base_lon + r.uniform(-0.15, 0.15)
        self.odometer_km = r.uniform(10_000, 90_000)
        self.is_charging = False
        self.charge_power_kw = 0.0
        self.health_score = r.uniform(0.92, 0.99)
        self.aux_load_kw = r.uniform(0.3, 1.5)
        self.ambient_temp = r.uniform(18, 34)
        self.fault = None

    def _earth_move(self, dt_s):
        dist_km = (self.speed_kph * dt_s) / 3600.0
        heading_rad = math.radians(self.heading_deg)
        dlat = (dist_km * math.cos(heading_rad)) / 111.0
        dlon = (dist_km * math.sin(heading_rad)) / (111.0 * max(0.1, math.cos(math.radians(self.lat))))
        self.lat += dlat
        self.lon += dlon
        self.odometer_km += max(0.0, dist_km)

    def _update_dynamics(self, dt_s):
        r = self.r
        # velocidade / direção
        self.speed_kph = clamp(self.speed_kph + r.uniform(-1.5, 1.5) * dt_s, 0, 130)
        self.heading_deg = (self.heading_deg + r.uniform(-4, 4) * (self.speed_kph/130.0)) % 360
        # ambiente
        self.ambient_temp = clamp(self.ambient_temp + r.uniform(-0.02, 0.02) * dt_s, 10, 42)

        # potência (simples e consistente)
        v = self.speed_kph
        aero_kw = 0.00015 * (v ** 3)
        rolling_kw = 0.01 * v
        temp_penalty = 0.15 if (self.ambient_temp < 12 or self.ambient_temp > 35) else 0.0
        total_kw = aero_kw + rolling_kw + self.aux_load_kw + temp_penalty

        # carga ocasional
        if not self.is_charging and (self.soc < 20) and (r.random() < 0.03):
            self.is_charging = True
            self.charge_power_kw = r.uniform(30, 120)
            self.speed_kph = 0.0

        if self.is_charging:
            charged_kwh = (self.charge_power_kw * dt_s) / 3600.0
            self.soc = clamp(self.soc + (100 * charged_kwh / self.pack_capacity_kwh), 0, 100)
            self.battery_temp = clamp(self.battery_temp + r.uniform(0.02, 0.08) * dt_s, -10, 60)
            self.coolant_temp = clamp(self.coolant_temp + r.uniform(0.01, 0.05) * dt_s, -10, 55)
            if self.soc >= 80 or r.random() < 0.005:
                self.is_charging = False
                self.charge_power_kw = 0.0
        else:
            used_kwh = (total_kw * dt_s) / 3600.0
            self.soc = clamp(self.soc - (100 * used_kwh / self.pack_capacity_kwh), 0, 100)
            heat_gain = 0.0008 * (v ** 2) + (total_kw * 0.015)
            self.motor_temp = clamp(self.motor_temp + (heat_gain - 0.03) * dt_s, -10, 140)
            self.battery_temp = clamp(self.battery_temp + (0.5 * heat_gain - 0.02) * dt_s, -10, 70)
            self.coolant_temp = clamp(self.coolant_temp + (0.3 * heat_gain - 0.025) * dt_s, -10, 60)

        # pneus + falhas leves
        idx = r.randrange(4)
        self.tyre_pressure_kpa[idx] = clamp(self.tyre_pressure_kpa[idx] + r.uniform(-0.5, 0.5), 170, 260)
        if r.random() < 1e-4:
            leak = r.randrange(4)
            self.tyre_pressure_kpa[leak] = clamp(self.tyre_pressure_kpa[leak] - r.uniform(5, 15), 120, 260)

        if self.fault is None and r.random() < 5e-4:
            self.fault = r.choice(["BMS_WARN_TEMP","TIRE_PRESS_LOW","COOLANT_LEVEL_LOW","BRAKE_PAD_WEAR"])
        elif self.fault and r.random() < 1e-3:
            self.fault = None

        self._earth_move(dt_s)

    def _electrics(self):
        v = clamp(self.voltage_nominal + (self.soc - 50) * 0.5 + random.uniform(-2, 2), 280, 420)
        if self.is_charging:
            power_kw = self.charge_power_kw
            current_a = (power_kw * 1000) / max(1.0, v)
        else:
            speed = self.speed_kph
            aero_kw = 0.00015 * (speed ** 3)
            rolling_kw = 0.01 * speed
            temp_penalty = 0.15 if (self.ambient_temp < 12 or self.ambient_temp > 35) else 0.0
            power_kw = aero_kw + rolling_kw + self.aux_load_kw + temp_penalty
            current_a = (power_kw * 1000) / max(1.0, v)
            current_a *= 1.05
        return v, current_a, power_kw

    def step(self, dt_s):
        self._update_dynamics(dt_s)
        pack_v, pack_a, power_kw = self._electrics()
        penalty = 0.0
        if self.battery_temp > 45: penalty += 0.0003
        if self.soc < 10: penalty += 0.0002
        self.health_score = clamp(self.health_score - penalty, 0.5, 1.0)

        return {
            "schema_version": SCHEMA_VERSION,
            "vehicle_id": self.vehicle_id,
            "timestamp": iso_now(),
            "metrics": {
                "soc_pct": round(self.soc, 2),
                "pack_voltage_v": round(pack_v, 2),
                "pack_current_a": round(pack_a, 2),
                "power_kw": round(power_kw, 3),
                "battery_temp_c": round(self.battery_temp, 2),
                "motor_temp_c": round(self.motor_temp, 2),
                "coolant_temp_c": round(self.coolant_temp, 2),
                "tyre_pressure_kpa": [round(p, 1) for p in self.tyre_pressure_kpa],
                "speed_kph": round(self.speed_kph, 2),
                "odometer_km": round(self.odometer_km, 3),
                "latitude": round(self.lat, 6),
                "longitude": round(self.lon, 6),
                "heading_deg": round(self.heading_deg, 1),
                "ambient_temp_c": round(self.ambient_temp, 1),
                "is_charging": self.is_charging,
                "charge_power_kw": round(self.charge_power_kw, 2),
                "health_score": round(self.health_score, 3),
            },
            "events": {
                "fault_code": self.fault
            }
        }

def make_vehicles(n, seed=None, base_lat=-23.5505, base_lon=-46.6333):
    r = random.Random(seed)
    return [EV(i+1,
               base_lat=base_lat + r.uniform(-0.5, 0.5),
               base_lon=base_lon + r.uniform(-0.5, 0.5),
               seed=r.randrange(1<<30)) for i in range(n)]

# ------- Sinks -------

class StdoutSink:
    def send(self, record: dict):
        print(json.dumps(record, ensure_ascii=False), flush=True)
    def close(self): pass

class FileSink:
    def __init__(self, path="telemetria.jsonl"):
        # Expande ~ e normaliza
        p = os.path.expanduser(path)

        # Se for diretório (existente ou terminar com /), usa nome padrão
        if p.endswith(os.sep) or os.path.isdir(p):
            os.makedirs(p, exist_ok=True)
            p = os.path.join(p, "telemetria.jsonl")
        else:
            parent = os.path.dirname(p) or "."
            os.makedirs(parent, exist_ok=True)

        try:
            self.f = open(p, "a", encoding="utf-8", newline="\n")
        except OSError as e:
            raise RuntimeError(f"Não consegui abrir '{p}': {e.strerror}") from e

        self.path = p

    def send(self, record: dict):
        self.f.write(json.dumps(record, ensure_ascii=False) + "\n")

    def close(self):
        self.f.flush()
        self.f.close()

class HTTPSink:
    def __init__(self, url, bearer=None, timeout=5):
        self.url = url; self.bearer = bearer; self.timeout = timeout
    def send(self, record: dict):
        data = json.dumps(record, ensure_ascii=False).encode("utf-8")
        headers = {"Content-Type": "application/json"}
        if self.bearer: headers["Authorization"] = f"Bearer {self.bearer}"
        req = Request(self.url, data=data, headers=headers, method="POST")
        # retry leve
        for attempt in range(3):
            try:
                with urlopen(req, timeout=self.timeout) as resp:
                    if 200 <= resp.status < 300: return
                    raise HTTPError(self.url, resp.status, "HTTP error", resp.headers, None)
            except Exception as e:
                if attempt == 2:
                    print(f"[WARN] Falha ao enviar: {e}")
                time.sleep(0.3 * (attempt+1))
    def close(self): pass

# Kafka é opcional; só cria se a lib existir
def _maybe_make_kafka_sink(topic, bootstrap_servers, **auth):
    try:
        from confluent_kafka import Producer
    except Exception as e:
        raise RuntimeError("Kafka não tá disponível. Vc pode instalar 'confluent-kafka' na célula anterior.") from e

    conf = {"bootstrap.servers": bootstrap_servers, "acks": "all",
            "enable.idempotence": True, "linger.ms": 20, "compression.type": "zstd"}
    for k in ("security.protocol","sasl.mechanism","sasl.username","sasl.password"):
        if auth.get(k): conf[k] = auth[k]
    prod = Producer(conf)
    class KafkaSink:
        def __init__(self, producer, topic):
            self.p = producer; self.topic = topic
        def send(self, record):
            key = record["vehicle_id"].encode("utf-8")
            val = json.dumps(record, ensure_ascii=False, separators=(",",":")).encode("utf-8")
            self.p.produce(self.topic, key=key, value=val)
            self.p.poll(0)
        def close(self):
            self.p.flush(10)
    return KafkaSink(prod, topic)

# ------- "API" simples para Colab -------

def simular_frota(
    # --- simulação ---
    vehicles: int = 20,
    hz: float = 1.0,
    duration: int = 60,
    seed: int | None = None,

    # --- sink principal: 'stdout' | 'file' | 'http' | 'kafka' ---
    sink: str = "stdout",

    # file
    file_path: str = "telemetria.jsonl",

    # http
    http_url: str | None = None,
    http_bearer: str | None = None,

    # kafka (telemetria bruta)
    kafka_topic: str | None = None,
    kafka_bootstrap: str | None = None,
    kafka_security_protocol: str | None = None,   # ex: "SASL_SSL" | "PLAINTEXT"
    kafka_sasl_mechanism: str | None = None,      # ex: "PLAIN"
    kafka_sasl_username: str | None = None,
    kafka_sasl_password: str | None = None,

    # --- alertas ---
    alert_threshold: float = 20.0,                # SOC < threshold => alerta
    enable_kafka_alerts: bool = False,
    kafka_alert_topic: str | None = None,         # tópico só de alertas
    enable_mongo_alerts: bool = False,
    mongo_uri: str = "mongodb://127.0.0.1:27017", # no Colab local
    mongo_db: str = "telemetria",
    mongo_collection: str = "ev_alerts",
    mongo_ttl_days: int | None = 30,
):
    """
    Simula EVs emitindo telemetria 'hz' vezes/s por 'duration' s.
    Sink principal: 'stdout' | 'file' | 'http' | 'kafka'
    Alertas: quando SOC < alert_threshold, envia para Kafka (tópico de alertas) e/ou Mongo.
    """
    if hz <= 0:
        raise ValueError("hz deve ser > 0")
    if vehicles <= 0:
        raise ValueError("vehicles deve ser > 0")

    random.seed(seed)
    fleet = make_vehicles(vehicles, seed=seed)

    # --- Seleciona o sink principal (telemetria bruta) ---
    if      sink == "stdout":
        s = StdoutSink()
    elif    sink == "file":
        s = FileSink(file_path)
    elif    sink == "http":
        if not http_url:
            raise ValueError("Defina http_url para sink='http'")
        s = HTTPSink(http_url, bearer=http_bearer)
    elif    sink == "kafka":
        if not kafka_topic or not kafka_bootstrap:
            raise ValueError("Defina kafka_topic e kafka_bootstrap para sink='kafka'")
        s = _maybe_make_kafka_sink(
            topic=kafka_topic,
            bootstrap_servers=kafka_bootstrap,
            **{
                "security.protocol": kafka_security_protocol,
                "sasl.mechanism": kafka_sasl_mechanism,
                "sasl.username": kafka_sasl_username,
                "sasl.password": kafka_sasl_password,
            }
        )
    else:
        raise ValueError("sink inválido. Use: 'stdout' | 'file' | 'http' | 'kafka'")

    # --- Sinks de alerta (apenas quando SOC < threshold) ---
    alert_sinks = []
    mongo_store = None
    kafka_alert_sink = None

    if enable_mongo_alerts:
        # requer classe MongoAlertStore definida
        mongo_store = MongoAlertStore(
            uri=mongo_uri, db=mongo_db, collection=mongo_collection, ttl_days=mongo_ttl_days
        )
        alert_sinks.append(mongo_store)

    if enable_kafka_alerts:
        if not kafka_bootstrap:
            raise ValueError("Defina kafka_bootstrap para alertas Kafka")
        if not kafka_alert_topic and not kafka_topic:
            raise ValueError("Defina kafka_alert_topic (ou configure kafka_topic para usar o mesmo)")
        # usa kafka_alert_topic se dado; senão cai pro kafka_topic
        alert_topic = kafka_alert_topic or kafka_topic
        kafka_alert_sink = _maybe_make_kafka_sink(
            topic=alert_topic,
            bootstrap_servers=kafka_bootstrap,
            **{
                "security.protocol": kafka_security_protocol,
                "sasl.mechanism": kafka_sasl_mechanism,
                "sasl.username": kafka_sasl_username,
                "sasl.password": kafka_sasl_password,
            }
        )
        alert_sinks.append(kafka_alert_sink)

    # --- loop de emissão ---
    dt = 1.0 / hz
    start = time.time()
    try:
        while True:
            now = time.time()
            if duration is not None and (now - start) >= duration:
                break

            tick_start = time.time()
            for v in fleet:
                rec = v.step(dt)

                # telemetria bruta
                s.send(rec)

                # alertas (SOC baixo)
                if rec["metrics"]["soc_pct"] < alert_threshold:
                    for asink in alert_sinks:
                        asink.send(rec)

            # manter frequência aproximada
            elapsed = time.time() - tick_start
            sleep_time = max(0.0, dt - elapsed)
            if sleep_time > 0:
                time.sleep(sleep_time)
    finally:
        # fechar tudo com carinho
        try: s.close()
        except: pass
        try:
            if kafka_alert_sink: kafka_alert_sink.close()
        except: pass
        try:
            if mongo_store: mongo_store.close()
        except: pass


# **Simulando os dados**

Foi simulado o seguinte cenário:
- 10 veículos sendo monitorados por 5 minutos
- Coleta de sinais a cada 1 segundo (parâmetro `hz`)

É possível aumentar a quantidade de carros e tempo de simulação para um volume de dados maior mas para uma quantidade de veículos acima de 50 e tempo de simulação acima de 30 minutos a simulação consume muito tempo e não foi possível simular cenários mais complexos. Mas com tempo disponível e recurso computacional é possível simular qualquer cenário.



In [7]:
# simular_frota(vehicles=10, hz=1, duration=60, seed=42, sink="file", file_path="/content/simulation_output/telemetria.jsonl")

# Se quiser baixar o json...
#files.download("/content/simulation_output/telemetria.jsonl")

In [8]:
simular_frota(
    vehicles=20, hz=2, duration=60,
    sink="file", file_path="out/telemetria/ev.jsonl",
    alert_threshold=60,
    enable_mongo_alerts=True,
    mongo_uri="mongodb://127.0.0.1:27017",
    mongo_db="telemetria",
    mongo_collection="ev_alerts",
    mongo_ttl_days=30,   # ou None para não expirar
)

## Verificando alertas armazenados no MongoDB

In [9]:
from pymongo import MongoClient
cli = MongoClient("mongodb://127.0.0.1:27017")
col = cli["telemetria"]["ev_alerts"]
print("Total de alertas:", col.count_documents({}))
for d in col.find({}, {"_id":0,"ts":1,"vehicle_id":1,"soc_pct":1}).sort("ts_dt",-1).limit(5):
    print(d)
cli.close()

Total de alertas: 840
{'ts': '2025-08-24T23:10:49.963275+00:00', 'vehicle_id': 'EV-00019', 'soc_pct': 47.16}
{'ts': '2025-08-24T23:10:49.962432+00:00', 'vehicle_id': 'EV-00016', 'soc_pct': 5.29}
{'ts': '2025-08-24T23:10:49.961572+00:00', 'vehicle_id': 'EV-00014', 'soc_pct': 56.66}
{'ts': '2025-08-24T23:10:49.959631+00:00', 'vehicle_id': 'EV-00001', 'soc_pct': 54.29}
{'ts': '2025-08-24T23:10:49.463072+00:00', 'vehicle_id': 'EV-00019', 'soc_pct': 47.16}


# **Pipeline de Telemetria**

O código abaixo implementa um fluxo de ingestão, tratamento e curadoria de dados de telemetria de carros elétricos usando PySpark em modo batch.

# **Arquitetura Lógica**

O pipeline segue as seguintes fases:

1. **Ingestão** → leitura de dados brutos (CSV, JSON, Parquet).
2. **Padronização** → ajuste de tipos, normalização de colunas e timezone.
3. **Tratamento** → regras de qualidade, remoção de duplicidades e checagem de domínios.
4. **Enriquecimento** → criação de colunas derivadas e partições.
5. **Validação** → cálculos de métricas de qualidade e estatísticas básicas.
6. **Gravação** → escrita em **Parquet particionado (ano, mes)** com compressão Snappy.
7. **Metadados** → registro de dicionário de dados e log de execução.
8. **Catálogo (opcional)** → integração com Hive Metastore ou Glue.


# **Explicação das Etapas**

## 1. Ingestão
- **O que faz**: lê os arquivos brutos em JSON, CSV ou Parquet.  
- **Detalhes**:  
  - JSON → utiliza `RAW_SCHEMA` explícito (campos nested).  
  - CSV → lê schema flat e reconstrói a estrutura.  
  - Parquet → já otimizado, com `mergeSchema` habilitado.  

## 2. Padronização
- **O que faz**: normaliza colunas e converte timestamps.  
- **Ações principais**:  
  - Converte `timestamp` em `event_ts_utc` (ISO8601 → Spark timestamp).  
  - Achata colunas de `metrics` e `events` em colunas simples.  
  - Uniformiza encoding e tipos numéricos.  

## 3. Tratamento e Qualidade
- **O que faz**: aplica regras de qualidade sobre os dados.  
- **Validações incluídas**:  
  - **Nulidade obrigatória** → `vehicle_id`, `event_ts_utc`.  
  - **Duplicidades** → registros duplicados pela chave `(vehicle_id, event_ts_utc)` são eliminados.  
  - **Domínios válidos** → ex.: `soc_pct` (0–100), `latitude` (-90–90).  
  - **Outliers** → marcados via método IQR (Interquartile Range).  

## 4. Enriquecimento
- **O que faz**: adiciona colunas derivadas úteis para análises.  
- **Colunas criadas**:  
  - `ano`, `mes`, `dia`, `hora` → partições temporais.  
  - `is_low_soc` → flag de bateria baixa (< 20%).  
  - `battery_temp_band` → faixa de temperatura (low, normal, high).  
  - `vehicle_ts_key` → chave de negócio (`vehicle_id#timestamp`).  

## 5. Validação
- **O que faz**: calcula métricas de qualidade dos dados processados.  
- **Métricas registradas**:  
  - Total de linhas processadas.  
  - Quantidade de registros limpos vs. em quarentena.  
  - % de registros válidos.  
  - KPIs específicos (ex.: nº de registros com `soc_pct < 20`).  

## 6. Gravação
- **O que faz**: persiste os dados tratados em Parquet particionado.  
- **Configurações**:  
  - Particionado por `ano` e `mes`.  
  - Compressão **Snappy**.  
  - Modo de escrita configurável:  
    - `append` (default)  
    - `overwrite` (reprocesso completo)  
    - `overwrite_partitions` (dinâmico, substitui apenas partições tocadas).  

## 7. Metadados
- **Arquivos gerados** em `_metadata/`:  
  - `dicionario_dados.json` → descreve cada coluna (nome, tipo, nulabilidade).  
  - `exec_log.json` → log de execução contendo:  
    - Caminhos de entrada/saída.  
    - Horário de início/fim do job.  
    - Métricas de qualidade.  
    - Versão do Spark usada.  

## 8. Catálogo
- **O que faz**: registra o dataset curado no Hive Metastore ou AWS Glue.  

# Criando o pipeline

In [22]:
# ================================
# Pipeline da telemetria dos carros elétricos
# ================================
# Requisitos:
#   - Spark 3.x
#   - Conectores adequados para S3, ADLS, GCS se usar nuvem.
#   - Para schema evolution simples em Parquet: mergeSchema (leitura) + unionByName(allowMissingColumns=True)
#
# Saídas:
#   - Parquet particionado por ano/mes (compressão Snappy)
#   - dicionario_dados.json (schema final) em /content/pipeline_output/_metadata
#   - exec_log.json (métricas/linhagem simples) em /content/pipeline_output/_metadata

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, BooleanType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import json, math, random, time, os, sys, re
from datetime import datetime, timezone
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
from google.colab import files
from typing import Dict, List, Tuple
from datetime import datetime, timezone


# -------------------------
# Configuração Spark
# -------------------------
def build_spark(app_name="EV_Telemetry_Batch"):
    spark = (
        SparkSession.builder
        .appName(app_name)
        # Parquet + compressão
        .config("spark.sql.parquet.compression.codec", "snappy")
        # Evolução de schema na leitura (mesclar schemas de arquivos)
        .config("spark.sql.parquet.mergeSchema", "true")
        # Overwrite dinâmico de partições
        .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
        # Shuffle e AQE para jobs maiores (opcional)
        .config("spark.sql.adaptive.enabled", "true")
        .getOrCreate()
    )
    return spark

# -------------------------
# Descoberta: Schema de entrada
# -------------------------
# Schema explícito (aninhado) compatível com o simulador JSON/Parquet
RAW_SCHEMA = StructType([
    StructField("schema_version", StringType(), True),
    StructField("vehicle_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("metrics", StructType([
        StructField("soc_pct", DoubleType(), True),
        StructField("pack_voltage_v", DoubleType(), True),
        StructField("pack_current_a", DoubleType(), True),
        StructField("power_kw", DoubleType(), True),
        StructField("battery_temp_c", DoubleType(), True),
        StructField("motor_temp_c", DoubleType(), True),
        StructField("coolant_temp_c", DoubleType(), True),
        StructField("tyre_pressure_kpa", ArrayType(DoubleType()), True),
        StructField("speed_kph", DoubleType(), True),
        StructField("odometer_km", DoubleType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True),
        StructField("heading_deg", DoubleType(), True),
        StructField("ambient_temp_c", DoubleType(), True),
        StructField("is_charging", BooleanType(), True),
        StructField("charge_power_kw", DoubleType(), True),
        StructField("health_score", DoubleType(), True),
    ]), True),
    StructField("events", StructType([
        StructField("fault_code", StringType(), True),
    ]), True),
])

# Para CSV (flat) – caso o(s) CSV(s) já venha(m) "achatados"
CSV_FLAT_SCHEMA = StructType([
    StructField("schema_version", StringType(), True),
    StructField("vehicle_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("soc_pct", DoubleType(), True),
    StructField("pack_voltage_v", DoubleType(), True),
    StructField("pack_current_a", DoubleType(), True),
    StructField("power_kw", DoubleType(), True),
    StructField("battery_temp_c", DoubleType(), True),
    StructField("motor_temp_c", DoubleType(), True),
    StructField("coolant_temp_c", DoubleType(), True),
    StructField("tyre_pressure_kpa", StringType(), True), # CSV pode trazer como string "[]"
    StructField("speed_kph", DoubleType(), True),
    StructField("odometer_km", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("heading_deg", DoubleType(), True),
    StructField("ambient_temp_c", DoubleType(), True),
    StructField("is_charging", StringType(), True),       # "true"/"false"
    StructField("charge_power_kw", DoubleType(), True),
    StructField("health_score", DoubleType(), True),
    StructField("fault_code", StringType(), True),
])

# -------------------------
# Auxiliares
# -------------------------
def infer_format_from_path(path: str) -> str:
    path = path.lower()
    if path.endswith(".json") or path.endswith(".jsonl"): return "json"
    if path.endswith(".csv"): return "csv"
    if path.endswith(".parquet"): return "parquet"
    # se for diretório com múltiplos arquivos, confie no parâmetro do usuário
    return None

def parse_boolean_col(col):
    # converte strings "true"/"false"/"1"/"0" para boolean
    return (
        F.when(F.col(col).cast("boolean").isNotNull(), F.col(col).cast("boolean"))
         .when(F.lower(F.col(col)).isin("true","t","1","yes","y"), F.lit(True))
         .when(F.lower(F.col(col)).isin("false","f","0","no","n"), F.lit(False))
         .otherwise(F.lit(None))
    )

# -------------------------
# 1) Ingestão
# -------------------------
def read_raw(spark: SparkSession, input_path: str, input_format: str = None) -> DataFrame:
    fmt = input_format or infer_format_from_path(input_path) or "json"

    if fmt == "json":
        df = (spark.read
              .format("json")
              .schema(RAW_SCHEMA)
              .option("multiLine", "false")
              .load(input_path))
    elif fmt == "parquet":
        # leitura com mergeSchema habilitado (builder já tem mergeSchema=true)
        df = spark.read.schema(RAW_SCHEMA).parquet(input_path)
    elif fmt == "csv":
        # CSV flat (sem nested). Ajuste separador se necessário.
        df = (spark.read
              .format("csv")
              .option("header", "true")
              .option("mode", "PERMISSIVE")
              .schema(CSV_FLAT_SCHEMA)
              .load(input_path))
        # Reconstrói estrutura nested como no JSON
        df = (
            df.withColumn("tyre_pressure_kpa",
                          F.when(F.col("tyre_pressure_kpa").isNull(), F.array().cast("array<double>"))
                           .otherwise(F.from_json("tyre_pressure_kpa", ArrayType(DoubleType()))))
              .withColumn("is_charging", parse_boolean_col("is_charging"))
              .withColumn("metrics", F.struct(
                    "soc_pct","pack_voltage_v","pack_current_a","power_kw",
                    "battery_temp_c","motor_temp_c","coolant_temp_c",
                    "tyre_pressure_kpa","speed_kph","odometer_km",
                    "latitude","longitude","heading_deg","ambient_temp_c",
                    "is_charging","charge_power_kw","health_score"
              ))
              .withColumn("events", F.struct("fault_code"))
              .select("schema_version","vehicle_id","timestamp_parsed","metrics","events")
        )
    else:
        raise ValueError(f"Formato não suportado: {fmt}")

    return df

# -------------------------
# 2) Padronização
# -------------------------
def standardize(df: DataFrame, tz_source="UTC", tz_target="UTC") -> DataFrame:
    # timestamp ISO8601 → timestamp (UTC)
    # se precisar converter TZ diferente, aplique from_utc_timestamp/to_utc_timestamp conforme origem
    df = df.withColumn("timestamp_parsed", F.to_timestamp(F.col("timestamp")))

    # Normaliza colunas (achatar metrics/events)
    df = df.select(
        "schema_version","vehicle_id","timestamp_parsed",
        F.col("metrics.soc_pct").alias("soc_pct"),
        F.col("metrics.pack_voltage_v").alias("pack_voltage_v"),
        F.col("metrics.pack_current_a").alias("pack_current_a"),
        F.col("metrics.power_kw").alias("power_kw"),
        F.col("metrics.battery_temp_c").alias("battery_temp_c"),
        F.col("metrics.motor_temp_c").alias("motor_temp_c"),
        F.col("metrics.coolant_temp_c").alias("coolant_temp_c"),
        F.col("metrics.tyre_pressure_kpa").alias("tyre_pressure_kpa"),
        F.col("metrics.speed_kph").alias("speed_kph"),
        F.col("metrics.odometer_km").alias("odometer_km"),
        F.col("metrics.latitude").alias("latitude"),
        F.col("metrics.longitude").alias("longitude"),
        F.col("metrics.heading_deg").alias("heading_deg"),
        F.col("metrics.ambient_temp_c").alias("ambient_temp_c"),
        F.col("metrics.is_charging").alias("is_charging"),
        F.col("metrics.charge_power_kw").alias("charge_power_kw"),
        F.col("metrics.health_score").alias("health_score"),
        F.col("events.fault_code").alias("fault_code"),
    )

    # timezone: já em UTC; se viesse em outro fuso ia ser convertido aqui
    df = df.withColumnRenamed("timestamp_parsed", "event_ts_utc")

    return df

# -------------------------
# 3) Tratamento
# -------------------------
MANDATORY = ["vehicle_id","event_ts_utc"]

# Fonte dos domínio baseado nos domínios do simulador
# Algins intervalos foram criados propositalmente para serem identificados nesta etapas
DOMAINS = {
    "soc_pct": (0, 100),
    "battery_temp_c": (18, 28),
    "motor_temp_c": (20, 35),
    "coolant_temp_c": (18, 25),
    "speed_kph": (0.0, 25),
    "latitude": (-90.0, 90.0), # Aqui fica difícil gerar um intervalo já que no simulador tudo é gerado a partir de uma distribuição uniforme
    "longitude": (-180.0, 180.0), # Aqui fica difícil gerar um intervalo já que no simulador tudo é gerado a partir de uma distribuição uniforme
    "heading_deg": (0.0, 360.0),
    "health_score": (0.0, 1.0),
    "pack_voltage_v": (200.0, 900.0),
    "pack_current_a": (-2000.0, 2000.0),
    "power_kw": (-300.0, 300.0),
    "charge_power_kw": (0.0, 400.0),
    "odometer_km": (0.0, 2_000_000.0),
    "ambient_temp_c": (0, 30),
}

def with_quality_flags(df: DataFrame) -> Tuple[DataFrame, DataFrame]:
    # Nulidade obrigatória
    for c in MANDATORY:
        df = df.withColumn(f"q_not_null__{c}", F.col(c).isNotNull())

    # Tipagem: tenta cast "seguro" (não deve alterar pois já padronizado)
    # Domínio: flag ok_range_<col>
    for c, (lo, hi) in DOMAINS.items():
        if c in df.columns:
            df = df.withColumn(f"q_range__{c}", (F.col(c) >= F.lit(lo)) & (F.col(c) <= F.lit(hi)))

    # Duplicidade (chave lógica vehicle_id+event_ts_utc)
    w = Window.partitionBy("vehicle_id","event_ts_utc").orderBy(F.col("event_ts_utc"))
    df = df.withColumn("rownum_key", F.row_number().over(w))
    df = df.withColumn("q_is_duplicate", F.col("rownum_key") > 1)

    # Outliers simples por amplitude interquartil em algumas métricas (opcional)
    # Existem outros métodos de detecção de outliers (Regressão, Isolation Forest, Z-Score, etc)
    # Coloquei o mais simples só como exemplo
    numeric_cols = [c for c,t in df.dtypes if t in ("double","float","int","bigint") and c in ["soc_pct","speed_kph","battery_temp_c","motor_temp_c","power_kw"]]
    for c in numeric_cols:
        # calcula IQR por job (não por veículo) – para simplificar
        stats = df.select(
            F.expr(f"percentile({c}, array(0.25, 0.75))").alias("q")
        ).first()
        if stats and stats.q and len(stats.q) == 2:
            q1, q3 = stats.q
            iqr = q3 - q1
            lo = q1 - 1.5*iqr
            hi = q3 + 1.5*iqr
            df = df.withColumn(f"q_iqr__{c}", (F.col(c) >= F.lit(lo)) & (F.col(c) <= F.lit(hi)))
        else:
            df = df.withColumn(f"q_iqr__{c}", F.lit(True))

    # Quarentena de registros ruins (nulos obrigatórios, fora de domínio, duplicados)
    quality_cols = [c for c in df.columns if c.startswith("q_")]
    any_bad = ~F.array_min(F.array([F.col(c).cast("boolean") for c in quality_cols if not c.startswith("q_is_duplicate")]))
    quarantine = df.filter(any_bad | F.col("q_is_duplicate"))

    # Dados limpos: remove duplicados mantendo a 1ª ocorrência e filtra por domínios obrigatórios
    clean = df.filter(~F.col("q_is_duplicate"))
    for c in MANDATORY:
        clean = clean.filter(F.col(c).isNotNull())
    for c,(lo,hi) in DOMAINS.items():
        if c in clean.columns:
            clean = clean.filter((F.col(c) >= lo) & (F.col(c) <= hi))

    clean = clean.drop("rownum_key")

    return clean, quarantine

# -------------------------
# 4) Enriquecimento
# -------------------------
def enrich(df: DataFrame) -> DataFrame:
    # Deriva data/hora e partições
    df = (df
          .withColumn("event_date", F.to_date("event_ts_utc"))
          .withColumn("ano", F.year("event_ts_utc"))
          .withColumn("mes", F.month("event_ts_utc"))
          .withColumn("dia", F.dayofmonth("event_ts_utc"))
          .withColumn("hora", F.hour("event_ts_utc"))
          # flags úteis
          .withColumn("is_low_soc", F.col("soc_pct") < 60)
          .withColumn("battery_temp_band",
                      F.when(F.col("battery_temp_c") < 10, F.lit("low"))
                       .when(F.col("battery_temp_c") <= 45, F.lit("normal"))
                       .otherwise(F.lit("high")))
          # chave de negócio (se quiser chave inteira)
          .withColumn("vehicle_ts_key", F.concat_ws("#", F.col("vehicle_id"), F.col("event_ts_utc").cast("string")))
    )
    return df

# -------------------------
# 5) Validação
# -------------------------
def compute_quality_metrics(clean: DataFrame, quarantine: DataFrame) -> Dict:
    total = clean.count() + quarantine.count()
    m = {
        "total_input_rows": total,
        "clean_rows": clean.count(),
        "quarantine_rows": quarantine.count(),
        "pct_clean": float((clean.count() / total) * 100.0) if total else 0.0,
        "pct_quarantine": float((quarantine.count() / total) * 100.0) if total else 0.0,
    }
    # Exemplo de métricas por domínio
    if "is_low_soc" in clean.columns:
        low_soc = clean.filter("is_low_soc = true").count()
        m["low_soc_rows"] = low_soc
    return m

# -------------------------
# 6) Gravação
# -------------------------
def write_parquet(
    df_new: DataFrame,
    output_path: str,
    mode: str = "append",  # opções: append, overwrite, overwrite_partitions
    partition_cols: List[str] = ["ano","mes"],
):
    writer = df_new.write.mode("append").option("compression","snappy")
    if mode == "overwrite":
        writer = df_new.write.mode("overwrite").option("compression","snappy")
    elif mode == "overwrite_partitions":
        # dynamic partition overwrite já habilitado via conf
        writer = df_new.write.mode("overwrite").option("compression","snappy")

    (writer
     .partitionBy(*partition_cols)
     .parquet(output_path)
    )

# Evolução de schema simples: unir novo com existente por nome (permitindo colunas faltantes)
def merge_with_existing_if_any(spark: SparkSession, df_new: DataFrame, output_path: str) -> DataFrame:
    try:
        df_old = spark.read.parquet(output_path)
        # union por nome com colunas ausentes
        df_merged = df_old.unionByName(df_new, allowMissingColumns=True)
        # removemos duplicados chave (se houver) – custo extra, opcional
        if {"vehicle_id","event_ts_utc"} <= set(df_merged.columns):
            df_merged = df_merged.dropDuplicates(["vehicle_id","event_ts_utc"])
        return df_merged
    except Exception:
        # primeira carga
        return df_new

# -------------------------
# 7) Metadados
# -------------------------
def schema_to_dict(df: DataFrame) -> List[Dict]:
    out = []
    for f in df.schema.fields:
        out.append({
            "name": f.name,
            "type": f.dataType.simpleString(),
            "nullable": f.nullable,
            "metadata": dict(f.metadata) if f.metadata else {},
        })
    return out

def write_metadata_files(spark: SparkSession, output_path: str, dict_schema: List[Dict], exec_log: Dict):
    meta_path = output_path.rstrip("/") + "/_metadata"
    # dicionário de dados
    (spark.createDataFrame([json.dumps(dict_schema)], "string")
          .toDF("json")
          .write.mode("overwrite").text(meta_path + "/dicionario_dados.json"))
    # log de execução
    (spark.createDataFrame([json.dumps(exec_log)], "string")
          .toDF("json")
          .write.mode("overwrite").text(meta_path + "/exec_log.json"))

# -------------------------
# 8) Catálogo (opcoisnal)
# -------------------------
def register_in_catalog(spark: SparkSession, table_name: str, location: str):
    spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING parquet LOCATION '{location}'")

# -------------------------
# Orquestração do Job
# -------------------------

from datetime import datetime, timezone, date as _date

try:
    from pymongo import MongoClient
    _HAS_PYMONGO = True
except Exception:
    _HAS_PYMONGO = False

class MongoMetricsStore:
    def __init__(self, uri: str, db: str = "telemetria",
                 coll_job: str = "pipeline_runs",
                 ttl_days: int | None = None):
        if not _HAS_PYMONGO:
            raise RuntimeError("pymongo não instalado. Rode: !pip install pymongo")
        self.client = MongoClient(uri, serverSelectionTimeoutMS=5000)
        self.client.admin.command("ping")
        self.db = self.client[db]
        self.col_job = self.db[coll_job]
        # índices úteis
        self.col_job.create_index([("job_name", 1), ("job_start_utc", -1)])
        if ttl_days and ttl_days > 0:
            self.col_job.create_index("job_end_dt", expireAfterSeconds=int(ttl_days*86400))

    def write_job_metrics(self, exec_log: dict, metrics: dict, extra: dict | None = None):
        # cria campo datetime 'naive' em UTC pra TTL
        try:
            dt = datetime.fromisoformat(exec_log["job_end_utc"])
            if dt.tzinfo: dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
        except Exception:
            dt = None
        doc = {"job_end_dt": dt, "metrics": metrics, **exec_log}
        if extra: doc["extra"] = extra
        self.col_job.insert_one(doc)

    def close(self):
        try: self.client.close()
        except: pass


def compute_daily_vehicle_metrics(df: DataFrame) -> DataFrame:
    # resumo por veículo/data para dashboard
    return (df.groupBy("vehicle_id", "event_date")
              .agg(
                  F.count("*").alias("rows"),
                  F.min("soc_pct").alias("soc_min"),
                  F.expr("percentile_approx(soc_pct, 0.5)").alias("soc_p50"),
                  F.max("soc_pct").alias("soc_max"),
                  F.sum(F.when(F.col("is_low_soc"), 1).otherwise(0)).alias("low_soc_cnt"),
                  F.avg("speed_kph").alias("speed_avg")
              ))

def insert_dataframe_to_mongo(df_agg: DataFrame, uri: str,
                              db: str = "telemetria",
                              collection: str = "ev_metrics_daily",
                              batch_size: int = 1000):
    if not _HAS_PYMONGO:
        raise RuntimeError("pymongo não instalado. Rode: !pip install pymongo")
    client = MongoClient(uri, serverSelectionTimeoutMS=5000)
    client.admin.command("ping")
    col = client[db][collection]
    # índice para consultas
    col.create_index([("vehicle_id", 1), ("event_date", -1)])

    buf = []
    def _flush():
        if buf:
            col.insert_many(buf)
            buf.clear()

    for row in df_agg.toLocalIterator():  # stream pra não estourar memória
        d = row.asDict(recursive=True)
        # event_date vem como datetime.date -> salva ISO string (simples p/ queries)
        if isinstance(d.get("event_date"), _date):
            d["event_date"] = d["event_date"].isoformat()
        buf.append(d)
        if len(buf) >= batch_size:
            _flush()
    _flush()
    client.close()

def run_job(
    input_path: str,
    output_path: str,
    input_format: str = None,     # "json"|"csv"|"parquet"|None(auto por extensão)
    mode: str = "append",         # append|overwrite|overwrite_partitions
    register_table: str = None,   # ex: "ev.blah_telemetry"
    tz_source: str = "UTC",
    tz_target: str = "UTC",
    # ==== Salvar métricas no MongoDB ====
    save_metrics_to_mongo: bool = False,
    mongo_uri: str = "mongodb://127.0.0.1:27017",
    mongo_db: str = "telemetria",
    mongo_coll_job: str = "pipeline_runs",
    mongo_coll_daily: str = "ev_metrics_daily",
    mongo_ttl_days: int | None = None,
):
    spark = build_spark()
    job_start = datetime.now(timezone.utc).isoformat()

    # 1) ingestão
    raw = read_raw(spark, input_path, input_format)

    # 2) padronização
    std = standardize(raw, tz_source=tz_source, tz_target=tz_target)

    # 3) tratamento
    clean, quarantine = with_quality_flags(std)

    # 4) enriquecimento
    enriched = enrich(clean)

    # quarentena opcional
    quarantine_path = output_path.rstrip("/") + "/_quarantine"
    if quarantine.count() > 0:
        (quarantine
         .withColumn("ano", F.year("event_ts_utc"))
         .withColumn("mes", F.month("event_ts_utc"))
         .write.mode("append").partitionBy("ano","mes")
         .parquet(quarantine_path))

    # 5) métricas do job
    metrics = compute_quality_metrics(clean=enriched, quarantine=quarantine)

    # 6) gravação parquet (schema evolution simples)
    df_final = merge_with_existing_if_any(spark, enriched, output_path)
    write_parquet(df_final, output_path=output_path,
                  mode=("overwrite" if mode=="overwrite" else ("overwrite_partitions" if mode=="overwrite_partitions" else "append")),
                  partition_cols=["ano","mes"])
    df_final.printSchema()

    # 7) catálogo (opcional)
    if register_table:
        register_in_catalog(spark, table_name=register_table, location=output_path)

    # Metadados em disco
    exec_log = {
        "job_name": "EV_Telemetry_Batch",
        "job_start_utc": job_start,
        "job_end_utc": datetime.now(timezone.utc).isoformat(),
        "input_path": input_path,
        "output_path": output_path,
        "read_format": input_format or infer_format_from_path(input_path) or "json",
        "mode": mode,
        "records_clean": metrics.get("clean_rows", 0),
        "records_quarantine": metrics.get("quarantine_rows", 0),
        "pct_clean": metrics.get("pct_clean", 0.0),
        "pct_quarantine": metrics.get("pct_quarantine", 0.0),
        "low_soc_rows": metrics.get("low_soc_rows", 0),
        "spark_version": spark.version,
    }
    dict_schema = schema_to_dict(df_final)
    write_metadata_files(spark, output_path, dict_schema, exec_log)

    # ==== Gravar métricas no MongoDB ====
    if save_metrics_to_mongo:
        store = MongoMetricsStore(
            uri=mongo_uri, db=mongo_db, coll_job=mongo_coll_job, ttl_days=mongo_ttl_days
        )
        store.write_job_metrics(exec_log=exec_log, metrics=metrics)

        # daily rollup por veículo/data
        daily = compute_daily_vehicle_metrics(enriched)
        insert_dataframe_to_mongo(daily, uri=mongo_uri, db=mongo_db, collection=mongo_coll_daily)

        store.close()

    return {"metrics": metrics, "exec_log": exec_log, "schema": dict_schema}

# Criando sessão Spark

In [23]:
spark = SparkSession.builder \
    .appName("EV_Telemetry_Batch") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("spark.sql.sources.partitionOverwriteMode","dynamic") \
    .getOrCreate()

# Rodando o pipeline

In [27]:
res = run_job(
    input_path="/content/simulation_output/telemetria.jsonl",
    input_format="json",
    output_path="/content/pipeline_output",
    mode="append",
    save_metrics_to_mongo=True,
    mongo_uri="mongodb://127.0.0.1:27017",
    mongo_db="telemetria",
    mongo_coll_job="pipeline_runs",
    mongo_coll_daily="ev_metrics_daily",
    mongo_ttl_days=30,
)

print(res["metrics"])

root
 |-- schema_version: string (nullable = true)
 |-- vehicle_id: string (nullable = true)
 |-- event_ts_utc: timestamp (nullable = true)
 |-- soc_pct: double (nullable = true)
 |-- pack_voltage_v: double (nullable = true)
 |-- pack_current_a: double (nullable = true)
 |-- power_kw: double (nullable = true)
 |-- battery_temp_c: double (nullable = true)
 |-- motor_temp_c: double (nullable = true)
 |-- coolant_temp_c: double (nullable = true)
 |-- tyre_pressure_kpa: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- speed_kph: double (nullable = true)
 |-- odometer_km: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- heading_deg: double (nullable = true)
 |-- ambient_temp_c: double (nullable = true)
 |-- is_charging: boolean (nullable = true)
 |-- charge_power_kw: double (nullable = true)
 |-- health_score: double (nullable = true)
 |-- fault_code: string (nullable = true)
 |-- q_not_null__vehicle_i

Conferindo métricas salvas no MongoDB

In [28]:
from pymongo import MongoClient
cli = MongoClient("mongodb://127.0.0.1:27017")

print("== pipeline_runs ==")
for d in cli["telemetria"]["pipeline_runs"].find({}, {"_id":0,"job_name":1,"job_start_utc":1,"records_clean":1,"low_soc_rows":1}).sort("job_start_utc",-1).limit(3):
    print(d)

print("\n== ev_metrics_daily ==")
for d in cli["telemetria"]["ev_metrics_daily"].find({}, {"_id":0,"event_date":1,"vehicle_id":1,"rows":1,"soc_min":1,"soc_p50":1,"soc_max":1,"low_soc_cnt":1}).sort([("event_date",-1)]).limit(5):
    print(d)
cli.close()

== pipeline_runs ==
{'job_name': 'EV_Telemetry_Batch', 'job_start_utc': '2025-08-24T23:24:20.063960+00:00', 'records_clean': 120, 'low_soc_rows': 0}

== ev_metrics_daily ==
{'vehicle_id': 'EV-00009', 'event_date': '2025-08-24', 'rows': 60, 'soc_min': 67.29, 'soc_p50': 67.31, 'soc_max': 67.32, 'low_soc_cnt': 0}
{'vehicle_id': 'EV-00003', 'event_date': '2025-08-24', 'rows': 43, 'soc_min': 87.53, 'soc_p50': 87.53, 'soc_max': 87.54, 'low_soc_cnt': 0}
{'vehicle_id': 'EV-00008', 'event_date': '2025-08-24', 'rows': 17, 'soc_min': 91.29, 'soc_p50': 91.29, 'soc_max': 91.3, 'low_soc_cnt': 0}


### Salvando arquivo parquet com dados processados

In [19]:
spark = (SparkSession.builder
         .appName("read-parquet")
         .config("spark.sql.parquet.mergeSchema","true")
         .getOrCreate())

df = (spark.read
      .option("recursiveFileLookup", "true")
      .option("pathGlobFilter", "*.parquet")
      .parquet("/content/pipeline_output"))   # raiz com tudo dentro

# Visualizando resultados

In [20]:
df.printSchema()

root
 |-- schema_version: string (nullable = true)
 |-- vehicle_id: string (nullable = true)
 |-- event_ts_utc: timestamp (nullable = true)
 |-- soc_pct: double (nullable = true)
 |-- pack_voltage_v: double (nullable = true)
 |-- pack_current_a: double (nullable = true)
 |-- power_kw: double (nullable = true)
 |-- battery_temp_c: double (nullable = true)
 |-- motor_temp_c: double (nullable = true)
 |-- coolant_temp_c: double (nullable = true)
 |-- tyre_pressure_kpa: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- speed_kph: double (nullable = true)
 |-- odometer_km: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- heading_deg: double (nullable = true)
 |-- ambient_temp_c: double (nullable = true)
 |-- is_charging: boolean (nullable = true)
 |-- charge_power_kw: double (nullable = true)
 |-- health_score: double (nullable = true)
 |-- fault_code: string (nullable = true)
 |-- q_not_null__vehicle_i

In [30]:
from pymongo import MongoClient

URI = "mongodb://127.0.0.1:27017"   # ajuste se usou outra porta
DB  = "telemetria"                  # o que usamos no pipeline

cli = MongoClient(URI, serverSelectionTimeoutMS=3000)
cli.admin.command("ping")
print("✅ conectado")

# 1) ver bancos e coleções
print("\n== Bancos ==")
print(cli.list_database_names())

print("\n== Coleções em", DB, "==")
db = cli[DB]
print(db.list_collection_names())

# 2) contar e amostrar registros das coleções mais prováveis
for coll_name in ["pipeline_runs","ev_metrics_daily","ev_alerts"]:
    col = db[coll_name]
    try:
        total = col.count_documents({})
        print(f"\n[{coll_name}] total docs:", total)
        # últimos 3 (tenta ordenar por campos comuns)
        cursor = col.find({}, {"_id":0}).sort([("job_start_utc",-1), ("event_date",-1)]).limit(3)
        for d in cursor:
            print(d)
        # 3) ver índices (inclui TTL se existir)
        print("Índices:")
        for idx in col.list_indexes():
            ttl = f" (TTL={idx.get('expireAfterSeconds')}s)" if 'expireAfterSeconds' in idx else ""
            print(" -", idx["name"], dict(idx["key"]), ttl)
    except Exception as e:
        print(f"[{coll_name}] (pode não existir) -> {e}")

cli.close()

✅ conectado

== Bancos ==
['admin', 'config', 'local', 'telemetria']

== Coleções em telemetria ==
['ev_alerts', 'pipeline_runs', 'ev_metrics_daily']

[pipeline_runs] total docs: 1
{'job_end_dt': datetime.datetime(2025, 8, 24, 23, 24, 28, 457000), 'metrics': {'total_input_rows': 600, 'clean_rows': 120, 'quarantine_rows': 480, 'pct_clean': 20.0, 'pct_quarantine': 80.0, 'low_soc_rows': 0}, 'job_name': 'EV_Telemetry_Batch', 'job_start_utc': '2025-08-24T23:24:20.063960+00:00', 'job_end_utc': '2025-08-24T23:24:28.457158+00:00', 'input_path': '/content/simulation_output/telemetria.jsonl', 'output_path': '/content/pipeline_output', 'read_format': 'json', 'mode': 'append', 'records_clean': 120, 'records_quarantine': 480, 'pct_clean': 20.0, 'pct_quarantine': 80.0, 'low_soc_rows': 0, 'spark_version': '3.5.1'}
Índices:
 - _id_ {'_id': 1} 
 - job_name_1_job_start_utc_-1 {'job_name': 1, 'job_start_utc': -1} 
 - job_end_dt_1 {'job_end_dt': 1}  (TTL=2592000s)

[ev_metrics_daily] total docs: 3
{'vehic