In [0]:
%python
from src.common.utils import load_json
from src.config.settings import METADATA_BASE_PATH, LAYER_PATHS
from pyspark.sql.utils import AnalysisException


In [0]:
%python
dbutils.widgets.text("table_name", "")

table_name = dbutils.widgets.get("table_name") or None


In [0]:
def validate_table_exists(path: str):
    try:
        spark.read.format("delta").load(path)
    except AnalysisException:
        raise Exception(f"Tabela não encontrada: {path}")


def validate_not_empty(df, table_name: str):
    if df.limit(1).count() == 0:
        raise Exception(f"Tabela {table_name} está vazia")


def validate_delta_format(path: str):
    if not spark._jsparkSession.sessionState().catalog().tableExists(path):
        # fallback seguro
        try:
            spark.read.format("delta").load(path)
        except Exception:
            raise Exception(f"Tabela não está em formato Delta: {path}")


def validate_required_columns(df, required_columns: list, table_name: str):
    missing = [c for c in required_columns if c not in df.columns]
    if missing:
        raise Exception(
            f"Tabela {table_name} está sem colunas obrigatórias: {missing}"
        )


In [0]:
metadata_path = f"{METADATA_BASE_PATH}/bronze"

files_df = spark.read.format("binaryFile") \
    .load(f"{metadata_path}/*.json")

json_files = [row.path for row in files_df.select("path").collect()]

errors = []

for file_path in json_files:
    metadata = load_json(file_path)
    table = metadata["table_name"]

    if table_name and table != table_name:
        continue

    print(f"Validando tabela bronze: {table}")

    table_path = f"{LAYER_PATHS['bronze']}/{table}"

    try:
        df = spark.read.format("delta").load(table_path)

        validate_not_empty(df, table)
        validate_required_columns(
            df,
            metadata.get("required_columns", []),
            table
        )

        print(f"{table} OK")

    except Exception as e:
        print(str(e))
        errors.append(str(e))


In [0]:
if errors:
    raise Exception(
        "Validação da camada bronze falhou:\n" + "\n".join(errors)
    )

print("Validação da camada bronze concluída com sucesso")
