In [None]:
#%pip install -r requirements.txt

In [None]:
import os
from google.cloud import bigquery
from google.api_core.exceptions import NotFound

PROJECT_ID = os.environ["PROJECT_ID"]
BRONZE_DATASET = os.environ.get("BRONZE_DATASET")
BRONZE_TABLE = os.environ["BRONZE_TABLE"]
SILVER_DATASET = os.environ.get("SILVER_DATASET")
LOCATION = os.environ.get("BQ_LOCATION")

bq = bigquery.Client(project=PROJECT_ID)

In [None]:
#from dotenv import load_dotenv
#from pathlib import Path
#load_dotenv("../../.env")

In [None]:
def run_sql(sql: str):
    """Executa SQL no BigQuery e espera terminar."""
    job = bq.query(sql, location=LOCATION)
    return job.result()

In [None]:
def ensure_dataset(dataset_id: str):
    """Cria dataset se não existir."""
    ds_ref = bigquery.Dataset(f"{PROJECT_ID}.{dataset_id}")
    ds_ref.location = LOCATION
    try:
        bq.get_dataset(ds_ref)
        return
    except NotFound:
        bq.create_dataset(ds_ref)
        print(f"Dataset criado: {PROJECT_ID}.{dataset_id}")

In [None]:
def table_exists(table_fqn: str) -> bool:
    try:
        bq.get_table(table_fqn)
        return True
    except NotFound:
        return False

In [None]:
def assert_max_key(table_fqn: str, sk_col: str, max_value: int = 999999):
    sql = f"SELECT MAX({sk_col}) AS mx FROM `{table_fqn}`"
    rows = list(bq.query(sql, location=LOCATION).result())
    mx = rows[0].mx if rows and rows[0].mx is not None else 0
    if mx > max_value:
        raise ValueError(f"ERRO: {table_fqn}.{sk_col} excedeu {max_value}. Max atual = {mx}")
    print(f"OK: max({sk_col})={mx} <= {max_value}")

In [None]:
def upsert_dimension(
    target_table: str,
    src_sql: str,
    nk_col: str,
    sk_col: str,
    attr_cols: list[str],
    max_sk: int = 999999,
):
    """
    Cria (primeira carga) ou faz MERGE incremental em uma dimensão.
    - target_table: 'projeto.dataset.tabela'
    - src_sql: query que retorna DISTINCT com nk_col + attr_cols
    - nk_col: nome do natural key (string) no src_sql
    - sk_col: surrogate key numérico (1..999999)
    - attr_cols: colunas descritivas da dimensão (ex.: nome, uf, etc.)
    """
    # validações mínimas
    if nk_col not in attr_cols:
        pass  # nk fica separado, tudo certo
    if not attr_cols:
        raise ValueError("attr_cols não pode ser vazio")

    cols_select = ",\n  ".join(attr_cols)
    cols_insert = ", ".join([sk_col] + attr_cols + [nk_col])
    vals_insert = ", ".join([f"S.{sk_col}_new"] + [f"S.{c}" for c in attr_cols] + [f"S.{nk_col}"])
    set_update = ",\n    ".join([f"{c} = S.{c}" for c in attr_cols])

    create_sql = f"""
CREATE TABLE `{target_table}` AS
WITH src AS (
  {src_sql}
)
SELECT
  ROW_NUMBER() OVER (ORDER BY {nk_col}) AS {sk_col},
  {cols_select},
  {nk_col}
FROM src;
"""

    merge_sql = f"""
MERGE `{target_table}` T
USING (
  WITH src AS (
    {src_sql}
  ),
  base_max AS (
    SELECT COALESCE(MAX({sk_col}), 0) AS m
    FROM `{target_table}`
  )
  SELECT
    src.*,
    m + ROW_NUMBER() OVER (ORDER BY {nk_col}) AS {sk_col}_new
  FROM src, base_max
) S
ON T.{nk_col} = S.{nk_col}
WHEN MATCHED THEN
  UPDATE SET
    {set_update}
WHEN NOT MATCHED THEN
  INSERT ({cols_insert})
  VALUES ({vals_insert});
"""

    # cria dataset do gold se precisar
    ds = target_table.split(".")[1]
    ensure_dataset(ds)

    if not table_exists(target_table):
        print("Criando dimensão (primeira carga):", target_table)
        run_sql(create_sql)
    else:
        print("Fazendo MERGE incremental:", target_table)
        run_sql(merge_sql)

    # garante 6 dígitos (até 999999)
    assert_max_key(target_table, sk_col, max_value=max_sk)

    print("OK:", target_table)

In [None]:
DIM_UF = f"{PROJECT_ID}.{SILVER_DATASET}.dm_uf"

SRC_DM_UF = f"""
WITH base AS (
  SELECT DISTINCT
    COALESCE(NULLIF(uf,''), 'UNK') AS nk_uf,
    COALESCE(NULLIF(uf,''), 'UNK') AS sg_uf
  FROM `{BRONZE_DATASET}.{BRONZE_TABLE}`
),
map_uf AS (
  SELECT * FROM UNNEST([
    STRUCT('AC' AS sg_uf, 'Acre' AS nm_uf),
    STRUCT('AL', 'Alagoas'),
    STRUCT('AP', 'Amapa'),
    STRUCT('AM', 'Amazonas'),
    STRUCT('BA', 'Bahia'),
    STRUCT('CE', 'Ceara'),
    STRUCT('DF', 'Distrito Federal'),
    STRUCT('ES', 'Espirito Santo'),
    STRUCT('GO', 'Goias'),
    STRUCT('MA', 'Maranhao'),
    STRUCT('MT', 'Mato Grosso'),
    STRUCT('MS', 'Mato Grosso do Sul'),
    STRUCT('MG', 'Minas Gerais'),
    STRUCT('PA', 'Para'),
    STRUCT('PB', 'Paraiba'),
    STRUCT('PR', 'Parana'),
    STRUCT('PE', 'Pernambuco'),
    STRUCT('PI', 'Piaui'),
    STRUCT('RJ', 'Rio de Janeiro'),
    STRUCT('RN', 'Rio Grande do Norte'),
    STRUCT('RS', 'Rio Grande do Sul'),
    STRUCT('RO', 'Rondonia'),
    STRUCT('RR', 'Roraima'),
    STRUCT('SC', 'Santa Catarina'),
    STRUCT('SP', 'Sao Paulo'),
    STRUCT('SE', 'Sergipe'),
    STRUCT('TO', 'Tocantins'),
    STRUCT('UNK', 'Desconhecida')
  ])
)
SELECT
  b.nk_uf,
  b.sg_uf,
  COALESCE(m.nm_uf, 'Desconhecida') AS nm_uf
FROM base b
LEFT JOIN map_uf m
  ON m.sg_uf = b.sg_uf
"""

upsert_dimension(
    target_table=DIM_UF,
    src_sql=SRC_DM_UF,
    nk_col="nk_uf",
    sk_col="cd_uf",
    attr_cols=["sg_uf", "nm_uf"],
)


In [None]:
DIM_MUNICIPIO = f"{PROJECT_ID}.{SILVER_DATASET}.dm_municipio"

SRC_DM_MUNICIPIO = f"""
WITH base AS (
  SELECT DISTINCT
    CONCAT(COALESCE(instituicao_municipio,''), '|', COALESCE(uf,'')) AS nk_municipio,
    instituicao_municipio AS dc_municipio,
    uf AS sg_uf
  FROM `{BRONZE_DATASET}.{BRONZE_TABLE}`
),
ufs AS (
  SELECT nk_uf, cd_uf
  FROM `{DIM_UF}`
)
SELECT
  b.nk_municipio,
  b.dc_municipio,
  u.cd_uf
FROM base b
LEFT JOIN ufs u ON u.nk_uf = COALESCE(b.sg_uf,'')
"""

upsert_dimension(
    target_table=DIM_MUNICIPIO,
    src_sql=SRC_DM_MUNICIPIO,
    nk_col="nk_municipio",
    sk_col="cd_municipio",
    attr_cols=["dc_municipio", "cd_uf"],
)


In [None]:
DIM_INSTITUICAO = f"{PROJECT_ID}.{SILVER_DATASET}.dm_instituicao"

SRC_DIM_INSTITUICAO = f"""
WITH inst AS (
  SELECT DISTINCT
    instituicao_cnpj AS nr_cnpj,
    instituicao_nome AS dc_instituicao,
    CONCAT(COALESCE(instituicao_municipio,''), '|', COALESCE(uf,'')) AS nk_municipio
  FROM `{BRONZE_DATASET}.{BRONZE_TABLE}`
),
mun AS (
  SELECT nk_municipio, cd_municipio
  FROM `{DIM_MUNICIPIO}`
)
SELECT
  CONCAT(
    COALESCE(i.nr_cnpj,''), '|',
    COALESCE(i.dc_instituicao,''), '|',
    COALESCE(CAST(m.cd_municipio AS STRING),'')
  ) AS nk_instituicao,
  i.nr_cnpj,
  i.dc_instituicao,
  m.cd_municipio
FROM inst i
LEFT JOIN mun m
  ON m.nk_municipio = i.nk_municipio
"""

upsert_dimension(
    target_table=DIM_INSTITUICAO,
    src_sql=SRC_DIM_INSTITUICAO,
    nk_col="nk_instituicao",
    sk_col="cd_instituicao",
    attr_cols=["nr_cnpj", "dc_instituicao", "cd_municipio"],
    max_sk=999999,
)


In [None]:
DIM_FORNECEDOR = f"{PROJECT_ID}.{SILVER_DATASET}.dm_fornecedor"

SRC_DIM_FORNECEDOR = f"""
SELECT DISTINCT
  CONCAT(COALESCE(fornecedor_cnpj,''), '|', COALESCE(fornecedor_nome,'')) AS nk_fornecedor,
  fornecedor_cnpj AS nr_cnpj,
  fornecedor_nome AS dc_fornecedor
FROM `{BRONZE_DATASET}.{BRONZE_TABLE}`
"""

upsert_dimension(
    target_table=DIM_FORNECEDOR,
    src_sql=SRC_DIM_FORNECEDOR,
    nk_col="nk_fornecedor",
    sk_col="cd_fornecedor",
    attr_cols=["nr_cnpj", "dc_fornecedor"],
    max_sk=999999
)

In [None]:
DIM_PRODUTO     = f"{PROJECT_ID}.{SILVER_DATASET}.dm_produto"

SRC_DM_PRODUTO = f"""
SELECT DISTINCT
  CONCAT(
    COALESCE(codigo_br,''), '|',
    COALESCE(catmat_descricao,''), '|',
    COALESCE(unidade_fornecimento,''), '|',
    COALESCE(generico,''), '|',
    COALESCE(anvisa,''), '|',
    COALESCE(CAST(capacidade AS STRING),''), '|',
    COALESCE(unidade_medida,''), '|',
    COALESCE(unidade_fornecimento_capacidade,'')
  ) AS nk_produto,
  codigo_br AS cd_codigo_br,
  catmat_descricao AS dc_produto,
  unidade_fornecimento AS dc_unidade_fornecimento,
  generico AS fl_generico,
  anvisa AS fl_anvisa,
  capacidade AS vl_capacidade,
  unidade_medida AS dc_unidade_medida,
  unidade_fornecimento_capacidade AS dc_unid_fornec_capacidade
FROM `{BRONZE_DATASET}.{BRONZE_TABLE}`
"""

upsert_dimension(
    target_table=DIM_PRODUTO,
    src_sql=SRC_DM_PRODUTO,
    nk_col="nk_produto",
    sk_col="cd_produto",
    attr_cols=[
        "cd_codigo_br",
        "dc_produto",
        "dc_unidade_fornecimento",
        "fl_generico",
        "fl_anvisa",
        "vl_capacidade",
        "dc_unidade_medida",
        "dc_unid_fornec_capacidade",
    ],
)

In [None]:
DIM_FABRICANTE  = f"{PROJECT_ID}.{SILVER_DATASET}.dm_fabricante"

SRC_DM_FABRICANTE = f"""
SELECT DISTINCT
  CONCAT(COALESCE(fabricante_cnpj,''), '|', COALESCE(fabricante_nome,'')) AS nk_fabricante,
  fabricante_cnpj AS nr_cnpj,
  fabricante_nome AS dc_fabricante
FROM `{BRONZE_DATASET}.{BRONZE_TABLE}`
"""

upsert_dimension(
    target_table=DIM_FABRICANTE,
    src_sql=SRC_DM_FABRICANTE,
    nk_col="nk_fabricante",
    sk_col="cd_fabricante",
    attr_cols=["nr_cnpj", "dc_fabricante"],
)

In [None]:
DIM_MODALIDADE = f"{PROJECT_ID}.{SILVER_DATASET}.dm_modalidade_compra"

SRC_DM_MODALIDADE = f"""
SELECT DISTINCT
  COALESCE(modalidade_compra,'') AS nk_modalidade_compra,
  modalidade_compra AS dc_modalidade_compra
FROM `{BRONZE_DATASET}.{BRONZE_TABLE}`
"""

upsert_dimension(
    target_table=DIM_MODALIDADE,
    src_sql=SRC_DM_MODALIDADE,
    nk_col="nk_modalidade_compra",
    sk_col="cd_modalidade_compra",
    attr_cols=["dc_modalidade_compra"],
)

In [None]:
DIM_TIPO_COMPRA = f"{PROJECT_ID}.{SILVER_DATASET}.dm_tipo_compra"

SRC_DM_TIPO_COMPRA = f"""
SELECT DISTINCT
  COALESCE(tipo_compra,'') AS nk_tipo_compra,
  tipo_compra AS dc_tipo_compra
FROM `{BRONZE_DATASET}.{BRONZE_TABLE}`
"""

upsert_dimension(
    target_table=DIM_TIPO_COMPRA,
    src_sql=SRC_DM_TIPO_COMPRA,
    nk_col="nk_tipo_compra",
    sk_col="cd_tipo_compra",
    attr_cols=["dc_tipo_compra"],
)


In [None]:
def create_or_replace_table_as(
    target_table: str,
    select_sql: str,
    partition_by: str | None = None,
    cluster_by: list[str] | None = None
):
    parts = []
    if partition_by:
        parts.append(f"PARTITION BY {partition_by}")
    if cluster_by:
        parts.append("CLUSTER BY " + ", ".join(cluster_by))

    opts = "\n".join(parts)
    sql = f"""
CREATE OR REPLACE TABLE `{target_table}`
{opts}
AS
{select_sql}
"""
    run_sql(sql)
    print("OK:", target_table)

In [None]:
FACT_BPS = f"{PROJECT_ID}.{SILVER_DATASET}.fato_bps"

FACT_SELECT = f"""
WITH base AS (
  SELECT
    compra_date AS dt_periodo,
    compra_ts,
    insercao_ts,
    qtd_itens_comprados AS qt_unidade,
    preco_unitario,
    preco_total,
    source_year,
    ingest_date,
    load_ts_utc,

    -- campos brutos que vamos usar pra montar NK nova de instituição
    instituicao_cnpj,
    instituicao_nome,

    -- NK do município (precisa bater com a dm_municipio)
    CONCAT(
      COALESCE(instituicao_municipio,''), '|',
      COALESCE(uf,'')
    ) AS nk_municipio,

    -- NKs (produto/fornecedor/fabricante/modalidade/tipo) permanecem iguais
    CONCAT(
      COALESCE(codigo_br,''), '|',
      COALESCE(catmat_descricao,''), '|',
      COALESCE(unidade_fornecimento,''), '|',
      COALESCE(generico,''), '|',
      COALESCE(anvisa,''), '|',
      COALESCE(CAST(capacidade AS STRING),''), '|',
      COALESCE(unidade_medida,''), '|',
      COALESCE(unidade_fornecimento_capacidade,'')
    ) AS nk_produto,

    CONCAT(COALESCE(fornecedor_cnpj,''), '|', COALESCE(fornecedor_nome,'')) AS nk_fornecedor,
    CONCAT(COALESCE(fabricante_cnpj,''), '|', COALESCE(fabricante_nome,'')) AS nk_fabricante,

    COALESCE(modalidade_compra,'') AS nk_modalidade_compra,
    COALESCE(tipo_compra,'') AS nk_tipo_compra

  FROM `{BRONZE_DATASET}.{BRONZE_TABLE}`
  WHERE compra_date IS NOT NULL
),

mun AS (
  SELECT
    nk_municipio,
    cd_municipio
  FROM `{DIM_MUNICIPIO}`
),

base2 AS (
  SELECT
    b.*,
    m.cd_municipio,

    -- NK NOVA da instituição: (cnpj|nome|cd_municipio)
    CONCAT(
      COALESCE(b.instituicao_cnpj,''), '|',
      COALESCE(b.instituicao_nome,''), '|',
      COALESCE(CAST(m.cd_municipio AS STRING),'0')
    ) AS nk_instituicao

  FROM base b
  LEFT JOIN mun m
    ON m.nk_municipio = b.nk_municipio
)

SELECT
  base2.dt_periodo,

  di.cd_instituicao,
  dp.cd_produto,
  df.cd_fornecedor,
  fab.cd_fabricante,
  dm.cd_modalidade_compra,
  dt.cd_tipo_compra,

  -- métricas
  base2.qt_unidade,
  base2.preco_unitario,
  base2.preco_total

  -- auditoria/linhagem (se quiser reativar)
  --, base2.compra_ts
  --, base2.insercao_ts
  --, base2.source_year
  --, base2.ingest_date
  --, base2.load_ts_utc

FROM base2
LEFT JOIN `{DIM_INSTITUICAO}` di ON di.nk_instituicao = base2.nk_instituicao
LEFT JOIN `{DIM_PRODUTO}`     dp ON dp.nk_produto     = base2.nk_produto
LEFT JOIN `{DIM_FORNECEDOR}`  df ON df.nk_fornecedor  = base2.nk_fornecedor
LEFT JOIN `{DIM_FABRICANTE}`  fab ON fab.nk_fabricante = base2.nk_fabricante
LEFT JOIN `{DIM_MODALIDADE}`  dm ON dm.nk_modalidade_compra = base2.nk_modalidade_compra
LEFT JOIN `{DIM_TIPO_COMPRA}` dt ON dt.nk_tipo_compra = base2.nk_tipo_compra
"""


In [None]:
create_or_replace_table_as(
    target_table=FACT_BPS,
    select_sql=FACT_SELECT,
    partition_by="dt_periodo",
    cluster_by=["cd_produto", "cd_instituicao"]
)