🔧 CÉLULA 1 – Setup e Bibliotecas (reutilizável)

#### 📚 BIBLIOTECAS PARA VALIDAÇÃO
#### Projeto: Dataflow Sentinel – Validação de Dados

In [None]:
# Instalar o Pandera (se necessário)
!pip install pandera --quiet

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/267.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m266.2/267.1 kB[0m [31m8.0 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m267.1/267.1 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# =============================================================
# 📚 BIBLIOTECAS PARA VALIDAÇÃO
# Projeto: Dataflow Sentinel – Validação de Dados
# =============================================================

import pandas as pd
import logging
import os
import pandera.pandas as pa
from pandera.pandas import Column, DataFrameSchema, Check
from pandera import Column, DataFrameSchema, Check

# Configuração de logging
logging.basicConfig(level=logging.INFO,
                    format='[%(levelname)s] %(asctime)s - %(message)s',
                    datefmt='%H:%M:%S')
logger = logging.getLogger()

📁 CÉLULA 2 – Carregamento dos Dados

#### 📁 LEITURA DO CSV CARREGADO

In [None]:
# =============================================================
# 📁 LEITURA DO CSV CARREGADO
# =============================================================

csv_path = '/content/dataflow_sentinel_supplyops.csv'

if not os.path.exists(csv_path):
    logger.error(f"Arquivo CSV não encontrado: {csv_path}")
else:
    logger.info(f"Arquivo localizado: {csv_path}")

df = pd.read_csv(csv_path)
df.head()

Unnamed: 0,shipment_id,route_id,date,delay_minutes,cost_usd,temp_c,status
0,SHP10000,RT02,1998-01-08,12,36.23,7.4,delivered
1,SHP10001,RT03,1998-01-09,8,37.28,4.4,failed
2,SHP10002,RT04,1998-01-10,7,31.8,3.1,in_transit
3,SHP10003,RT04,1998-01-11,12,98.73,5.1,in_transit
4,SHP10004,RT02,1998-01-12,10,69.52,0.8,delivered


✅ CÉLULA 3 – Schema com Pandera

#### 🧩 DEFINIÇÃO DO SCHEMA DE VALIDAÇÃO

In [None]:
# =============================================================
# 🧩 DEFINIÇÃO DO SCHEMA DE VALIDAÇÃO
# =============================================================

schema = DataFrameSchema({
    "shipment_id": Column(pa.String, checks=[
        Check.str_length(min_value=6),
        Check(lambda s: s.str.startswith("SHP"))
    ], unique=True),

    "route_id": Column(pa.String, checks=Check.isin(["RT01", "RT02", "RT03", "RT04"])),

    "date": Column(pa.String),  # datetime será tratado depois

    "delay_minutes": Column(pa.Int, checks=[
        Check.ge(0),
        Check.le(240)
    ]),

    "cost_usd": Column(pa.Float, checks=[
        Check.ge(0),
        Check.le(1000)
    ]),

    "temp_c": Column(pa.Float, checks=[
        Check.ge(-30),
        Check.le(50)
    ]),

    "status": Column(pa.String, checks=Check.isin(["delivered", "in_transit", "failed"]))
})


🧪 CÉLULA 4 – Aplicar Validação

#### ✅ EXECUTAR VALIDAÇÃO COM O SCHEMA

In [None]:
# =============================================================
# ✅ EXECUTAR VALIDAÇÃO COM O SCHEMA
# =============================================================

try:
    validated_df = schema.validate(df, lazy=True)
    logger.info("✅ Dados validados com sucesso.")
except pa.errors.SchemaErrors as err:
    logger.error("❌ Erros na validação de dados.")
    display(err.failure_cases)


✅ CÉLULA FINAL – Função valida_dados(df) + Exportação

#### ✅ FUNÇÃO REUTILIZÁVEL DE VALIDAÇÃO DE DADOS

In [None]:
# =============================================================
# ✅ FUNÇÃO REUTILIZÁVEL DE VALIDAÇÃO DE DADOS
# =============================================================

def valida_dados(df: pd.DataFrame) -> pd.DataFrame:
    """
    Valida os dados de entrada conforme regras de negócio e integridade.
    Retorna um DataFrame validado ou gera exceções com os erros detectados.
    """
    schema = DataFrameSchema({
        "shipment_id": Column(pa.String, checks=[
            Check.str_length(min_value=6),
            Check(lambda s: s.str.startswith("SHP"))
        ], unique=True),

        "route_id": Column(pa.String, checks=Check.isin(["RT01", "RT02", "RT03", "RT04"])),

        "date": Column(pa.String),

        "delay_minutes": Column(pa.Int, checks=[
            Check.ge(0),
            Check.le(240)
        ]),

        "cost_usd": Column(pa.Float, checks=[
            Check.ge(0),
            Check.le(1000)
        ]),

        "temp_c": Column(pa.Float, checks=[
            Check.ge(-30),
            Check.le(50)
        ]),

        "status": Column(pa.String, checks=Check.isin(["delivered", "in_transit", "failed"]))
    })

    try:
        validated_df = schema.validate(df, lazy=True)
        logger.info("✅ Dados validados com sucesso.")
        return validated_df
    except pa.errors.SchemaErrors as err:
        logger.error("❌ Erros detectados na validação dos dados.")
        display(err.failure_cases)
        raise


💾 CÉLULA EXTRA – Aplicar a função + salvar versão validada

#### 💾 APLICAR VALIDAÇÃO E SALVAR ARQUIVO VALIDADO

In [None]:
# =============================================================
# 💾 APLICAR VALIDAÇÃO E SALVAR ARQUIVO VALIDADO
# =============================================================

validated_df = valida_dados(df)

# Caminho para salvar o dataset validado
output_path = "/content/dataflow_validado.parquet"

# Exportar como parquet (mais eficiente e pronto para pipeline)
validated_df.to_parquet(output_path, index=False)
logger.info(f"📦 Arquivo validado salvo em: {output_path}")


#### ✅ Etapa Concluída – Validação de Dados

Este notebook representa uma etapa **obrigatória** do pipeline:
- Verifica integridade dos dados recebidos
- Aplica regras de negócio e tipagem estrita
- Garante rastreabilidade antes de qualquer transformação

📤 Saída esperada:
- `dataflow_validado.parquet` – versão segura, consistente e auditável dos dados

🔁 Reutilização:
A função `valida_dados(df)` pode ser chamada em fluxos futuros (ETL, batch, produção).
