In [1]:
import os


def find_in_path(filename):
    for path in os.environ["PATH"].split(os.pathsep):
        full_path = os.path.join(path, filename)
        if os.path.isfile(full_path):
            return full_path
    return None

print("winutils.exe:", find_in_path("winutils.exe"))
print("hadoop.dll:", find_in_path("hadoop.dll"))

winutils.exe: C:\Users\Mo.DESKTOP-3491UQD\.spark\Hadoop\bin\winutils.exe
hadoop.dll: C:\Users\Mo.DESKTOP-3491UQD\.spark\Hadoop\bin\hadoop.dll


In [2]:

import sys

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
from open_finance_lakehouse.utils.spark_session import get_spark_session

get_spark_session.cache_clear()
spark = get_spark_session()

In [49]:
# 2. Ingestão BACEN API
import io
from datetime import datetime, timedelta

import pandas as pd
import requests

from open_finance_lakehouse.utils.spark_session import get_spark_session


def fetch_all_bacen_series(series_id, end_date=None):

    """
    Fetch all data from BACEN API for a given series_id, handling the 10-year window limit.
    Logs progress for each window.
    """
    if end_date is None:
        end_date = datetime.today()
    all_data = []
    step = 10  # years
    min_year = 1900  # BACEN data doesn't go before this
    finished = False
    window_count = 0
    while not finished:
        start_date = end_date.replace(year=max(min_year, end_date.year - step + 1))
        print(f"🔎 Fetching BACEN {series_id}: {start_date.strftime('%d/%m/%Y')} to {end_date.strftime('%d/%m/%Y')} (step={step})")
        url = (
            f"https://api.bcb.gov.br/dados/serie/bcdata.sgs.{series_id}/dados"
            f"?formato=json&dataInicial={start_date.strftime('%d/%m/%Y')}&dataFinal={end_date.strftime('%d/%m/%Y')}"
        )
        try:
            response = requests.get(url)
            response.raise_for_status()
            df = pd.read_json(io.StringIO(response.text))
            if df.empty:
                print("⚠️  No data returned for window. Reducing step.")
                if step == 1:
                    finished = True
                else:
                    step = max(1, step - 1)
                continue
            print(f"✅ Retrieved {len(df)} rows.")
            all_data.append(df)
            window_count += 1
            # Move window back
            end_date = start_date - timedelta(days=1)
        except Exception as e:
            print(f"❌ Error: {e}. Reducing step.")
            if step == 1:
                finished = True
            else:
                step = max(1, step - 1)
    print(f"🎉 Finished fetching BACEN series {series_id}. Total windows: {window_count}")
    if all_data:
        result = pd.concat(all_data, ignore_index=True)
        result.columns = ["data", "valor"]
        result["data"] = pd.to_datetime(result["data"], format="%d/%m/%Y")
        result["valor"] = pd.to_numeric(result["valor"], errors="coerce")
        result = result.sort_values("data").reset_index(drop=True)
        print(f"📦 Total rows fetched: {len(result)}")
        return result
    else:
        print("⚠️  No data fetched from BACEN API.")
        return pd.DataFrame(columns=["data", "valor"])

# Example usage:
bacen_series_id = 11  # SELIC
df_bacen = fetch_all_bacen_series(bacen_series_id)
print(len(df_bacen), df_bacen.shape, df_bacen.dtypes, df_bacen.head())

🔎 Fetching BACEN 11: 02/05/2016 to 02/05/2025 (step=10)
✅ Retrieved 2258 rows.
🔎 Fetching BACEN 11: 01/05/2007 to 01/05/2016 (step=10)
✅ Retrieved 2262 rows.
🔎 Fetching BACEN 11: 30/04/1998 to 30/04/2007 (step=10)
✅ Retrieved 2260 rows.
🔎 Fetching BACEN 11: 29/04/1989 to 29/04/1998 (step=10)
✅ Retrieved 2246 rows.
🔎 Fetching BACEN 11: 28/04/1980 to 28/04/1989 (step=10)
✅ Retrieved 726 rows.
🔎 Fetching BACEN 11: 27/04/1971 to 27/04/1980 (step=10)
❌ Error: 404 Client Error: Not Found for url: https://api.bcb.gov.br/dados/serie/bcdata.sgs.11/dados?formato=json&dataInicial=27/04/1971&dataFinal=27/04/1980. Reducing step.
🔎 Fetching BACEN 11: 27/04/1972 to 27/04/1980 (step=9)
❌ Error: 404 Client Error: Not Found for url: https://api.bcb.gov.br/dados/serie/bcdata.sgs.11/dados?formato=json&dataInicial=27/04/1972&dataFinal=27/04/1980. Reducing step.
🔎 Fetching BACEN 11: 27/04/1973 to 27/04/1980 (step=8)
❌ Error: 404 Client Error: Not Found for url: https://api.bcb.gov.br/dados/serie/bcdata.sgs.

In [None]:
import tempfile
import zipfile
from collections import Counter
from datetime import datetime  #noqa
from difflib import get_close_matches
from functools import reduce
from pathlib import Path

import requests
from pyspark.sql import DataFrame
from pyspark.sql import functions as F


def fetch_all_cvm_months(start_year=2015, start_month=1):
    """
    Fetch all CVM FI daily reports going back month by month until no data is found.
    Returns a list of Spark DataFrames (one per month), going backward in time.
    Retries up to 3 times for up to 4 consecutive missing datasets before giving up.
    Logs all rows with null CNPJ to a file.
    Accepts any file type: zip, csv, parquet, etc.
    """
    dfs = []
    today = datetime.today()
    year = today.year
    month = today.month
    null_log_path = Path("null_cnpj_log.txt")
    log_file = open(null_log_path, "a", encoding="utf-8")

    consecutive_missing = 0
    max_consecutive_missing = 50

    file_types = [
        (".zip", "zip"),
        (".csv", "csv"),
        (".parquet", "parquet"),
        (".txt", "csv"),  # Sometimes txt is used for CSVs
    ]

    link_historico=  "https://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/HIST/"

    while (year > start_year) or (year == start_year and month >= start_month):
        cvm_base_url = "https://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/"
        file_found = False
        for ext, fmt in file_types:
            cvm_file_name = f"inf_diario_fi_{year}{str(month).zfill(2)}{ext}"
            cvm_url = f"{cvm_base_url}{cvm_file_name}"
            for attempt in range(1, 4):  # up to 3 attempts
                try:
                    response = requests.get(cvm_url)
                    print(f"🔍 Attempt {attempt} for {cvm_file_name}: {response.status_code}")
                    if response.status_code != requests.codes.ok:
                        print(f"⚠️  No valid file for {year}-{month:02d} ({cvm_file_name}) (attempt {attempt}).")
                        continue

                    # Save to temp file
                    with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp:
                        tmp.write(response.content)
                        tmp_path = tmp.name

                    # Read file according to type
                    if fmt == "zip":
                        if not response.content.startswith(b'PK'):
                            print(f"⚠️  Not a valid zip file for {cvm_file_name}.")
                            os.unlink(tmp_path)
                            continue
                        with zipfile.ZipFile(tmp_path, 'r') as zip_ref:
                            csv_name = [n for n in zip_ref.namelist() if n.lower().endswith('.csv') or n.lower().endswith('.txt')]
                            if not csv_name:
                                print(f"⚠️  No CSV/TXT found in zip for {cvm_file_name}.")
                                os.unlink(tmp_path)
                                continue
                            csv_name = csv_name[0]
                            zip_ref.extract(csv_name, os.path.dirname(tmp_path))
                            file_to_read = os.path.join(os.path.dirname(tmp_path), csv_name)
                        read_fmt = "csv"
                        cleanup_files = [tmp_path, file_to_read]
                    elif fmt == "csv":
                        file_to_read = tmp_path
                        read_fmt = "csv"
                        cleanup_files = [tmp_path]
                        csv_name = cvm_file_name
                    elif fmt == "parquet":
                        file_to_read = tmp_path
                        read_fmt = "parquet"
                        cleanup_files = [tmp_path]
                        csv_name = cvm_file_name
                    else:
                        os.unlink(tmp_path)
                        continue

                    # Read with Spark
                    if read_fmt == "csv":
                        df = spark.read.csv(
                            file_to_read,
                            header=True,
                            sep=";",
                            inferSchema=True,
                            encoding="ISO-8859-1"
                        )
                    elif read_fmt == "parquet":
                        df = spark.read.parquet(file_to_read)
                    else:
                        print(f"⚠️  Unknown file format for {cvm_file_name}.")
                        for f in cleanup_files:
                            os.unlink(f)
                        continue

                    row_count = df.count()
                    if row_count == 0:
                        print(f"⚠️  No data found for {year}-{month:02d}.")
                        for f in cleanup_files:
                            os.unlink(f)
                        continue
                    df.cache()
                    cnpj_col = None
                    for col in df.columns:
                        if "cnpj" in col.lower():
                            cnpj_col = col
                            print(f"🔍 Found CNPJ column: {cnpj_col}")
                            break
                    if cnpj_col:
                        null_cnpj_rows = df.filter(F.col(cnpj_col).isNull())
                        null_count = null_cnpj_rows.count()
                        print(f"🔍 Found {null_count} null CNPJ rows in {csv_name}.")
                        if null_count > 0:
                            print(f"⚠️  {null_count} rows with null {cnpj_col} in {csv_name}. Logging to {null_log_path}")
                            rows = null_cnpj_rows.toPandas().to_dict(orient="records")
                            for row in rows:
                                log_file.write(f"{csv_name}: {row}\n")
                    else:
                        print("⚠️  No CNPJ column found for null check.")
                    dfs.append(df)
                    print(f"✅ Loaded {csv_name} ({year}-{month:02d}) with {row_count} rows")
                    for f in cleanup_files:
                        os.unlink(f)
                    file_found = True
                    break  # exit retry loop
                except Exception as e:
                    print(f"⚠️  Failed to fetch or load {cvm_url} (attempt {attempt}): {e}")
            if file_found:
                break  # found a file for this month, move to next month

        if not file_found:
            consecutive_missing += 1
            print(f"❌ Failed to fetch any file for {year}-{month:02d} after 3 attempts per type. Consecutive missing: {consecutive_missing}")
            if consecutive_missing >= max_consecutive_missing:
                print(f"🛑 Giving up after {max_consecutive_missing} consecutive missing datasets.")
                break
        else:
            consecutive_missing = 0  # reset on success

        # Move to previous month
        if month == 1:
            month = 12
            year -= 1
        else:
            month -= 1

    log_file.close()
    return dfs

# Example usage:
dfs = fetch_all_cvm_months(start_year=2015, start_month=1)
if dfs:
    # Fuzzy schema normalization
    # Find the most common set of column names
    colname_lists = [tuple(sorted(df.columns)) for df in dfs]
    most_common_cols, _ = Counter(colname_lists).most_common(1)[0]
    most_common_cols = list(most_common_cols)

    def fuzzy_normalize_df(df, target_cols, cutoff=0.8):
        col_map = {}
        for target in target_cols:
            match = get_close_matches(target, df.columns, n=1, cutoff=cutoff)
            if match:
                col_map[match[0]] = target
        # Rename columns that match
        for src, tgt in col_map.items():
            if src != tgt:
                df = df.withColumnRenamed(src, tgt)
        # Add missing columns as nulls
        for col in target_cols:
            if col not in df.columns:
                df = df.withColumn(col, F.lit(None))
        # Select only target columns in order
        df = df.select([col for col in target_cols])
        return df

    dfs_normalized = [fuzzy_normalize_df(df, most_common_cols) for df in dfs]
    cvm_spark_df = reduce(DataFrame.unionByName, reversed(dfs_normalized))
else:
    print("⚠️ No CVM data was fetched. The DataFrame list is empty.")

🔍 Attempt 1 for inf_diario_fi_202505.zip: 200
🔍 Found CNPJ column: CNPJ_FUNDO_CLASSE
🔍 Found 0 null CNPJ rows in inf_diario_fi_202505.csv.
✅ Loaded inf_diario_fi_202505.csv (2025-05) with 90 rows
🔍 Attempt 1 for inf_diario_fi_202504.zip: 200
🔍 Found CNPJ column: CNPJ_FUNDO_CLASSE
🔍 Found 0 null CNPJ rows in inf_diario_fi_202504.csv.
✅ Loaded inf_diario_fi_202504.csv (2025-04) with 503066 rows
🔍 Attempt 1 for inf_diario_fi_202503.zip: 200
🔍 Found CNPJ column: CNPJ_FUNDO_CLASSE
🔍 Found 0 null CNPJ rows in inf_diario_fi_202503.csv.
✅ Loaded inf_diario_fi_202503.csv (2025-03) with 487444 rows
🔍 Attempt 1 for inf_diario_fi_202502.zip: 200
🔍 Found CNPJ column: CNPJ_FUNDO_CLASSE
🔍 Found 0 null CNPJ rows in inf_diario_fi_202502.csv.
✅ Loaded inf_diario_fi_202502.csv (2025-02) with 511300 rows
🔍 Attempt 1 for inf_diario_fi_202501.zip: 200
🔍 Found CNPJ column: CNPJ_FUNDO_CLASSE
🔍 Found 0 null CNPJ rows in inf_diario_fi_202501.csv.
✅ Loaded inf_diario_fi_202501.csv (2025-01) with 560370 rows
🔍 At

In [57]:
print(
    f"Rows: {cvm_spark_df.count()}",
    f"Columns: {len(cvm_spark_df.columns)}",
    f"Schema: {cvm_spark_df.dtypes}",
    sep="\n"
)
cvm_spark_df.show(5, truncate=False)

Rows: 26075230
Columns: 9
Schema: [('CAPTC_DIA', 'double'), ('CNPJ_FUNDO', 'string'), ('DT_COMPTC', 'date'), ('NR_COTST', 'int'), ('RESG_DIA', 'double'), ('TP_FUNDO', 'string'), ('VL_PATRIM_LIQ', 'double'), ('VL_QUOTA', 'double'), ('VL_TOTAL', 'double')]
+---------+------------------+----------+--------+--------+--------+-------------+----------+----------+
|CAPTC_DIA|CNPJ_FUNDO        |DT_COMPTC |NR_COTST|RESG_DIA|TP_FUNDO|VL_PATRIM_LIQ|VL_QUOTA  |VL_TOTAL  |
+---------+------------------+----------+--------+--------+--------+-------------+----------+----------+
|0.0      |00.017.024/0001-53|2021-01-04|1       |0.0     |FI      |1095773.57   |27.5033358|1097664.87|
|0.0      |00.017.024/0001-53|2021-01-05|1       |0.0     |FI      |1095778.31   |27.5034547|1097742.61|
|0.0      |00.017.024/0001-53|2021-01-06|1       |0.0     |FI      |1095768.02   |27.5031964|1097837.04|
|0.0      |00.017.024/0001-53|2021-01-07|1       |0.0     |FI      |1095774.2    |27.5033516|1097939.54|
|0.0      

In [58]:
import os

import boto3
import requests
from botocore.exceptions import ClientError
from dotenv import load_dotenv

load_dotenv()
# Configurações do MinIO
MINIO_ENDPOINT = "http://localhost:9000"
ACCESS_KEY = os.getenv("MINIO_USER")
SECRET_KEY = os.getenv("MINIO_PASSWORD")
print(os.getenv("AIRFLOW_PROJ_DIR"))
BUCKET_NAME = "lakehouse"

# Conectar ao MinIO (S3-compatible)
s3 = boto3.client(
    "s3",
    endpoint_url=MINIO_ENDPOINT,
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
)

# Checar e criar bucket
try:
    s3.head_bucket(Bucket=BUCKET_NAME)
    print(f"✅ Bucket '{BUCKET_NAME}' já existe.")
except ClientError as e:
    error_code = int(e.response["Error"]["Code"])
    if error_code == requests.codes.not_found:
        print(f"🔧 Criando bucket '{BUCKET_NAME}'...")
        s3.create_bucket(Bucket=BUCKET_NAME)
        print("✅ Bucket criado com sucesso.")
    else:
        raise


./airflow
✅ Bucket 'lakehouse' já existe.


In [59]:
# 3. Salvar como Delta (Bronze) no MinIO
df_bacen_spark = spark.createDataFrame(df_bacen)

In [61]:
df_bacen_spark.write.format("delta").mode("overwrite").save("s3a://lakehouse/bronze/bacen_selic/")
cvm_spark_df.write.format("delta").mode("overwrite").save("s3a://lakehouse/bronze/cvm_if_di/")

In [62]:
# 4. Transformações da camada Silver
from pyspark.sql import functions as F

silver_bacen = (
    df_bacen_spark
    .withColumn("ano", F.year("data"))
    .withColumn("mes", F.month("data"))
    .withColumn("dia", F.dayofmonth("data"))
)

silver_cvm = (
    cvm_spark_df.withColumn("cap_liquida_dia", F.col("CAPTC_DIA") - F.col("RESG_DIA"))
        .withColumn("ano", F.year("DT_COMPTC"))
        .withColumn("mes", F.month("DT_COMPTC"))
        .withColumn("dia", F.dayofmonth("DT_COMPTC"))
        .filter(F.col("VL_QUOTA") > 0)
        .filter(F.col("VL_PATRIM_LIQ") > 0)
)




silver_bacen.write.format("delta").mode("overwrite").save("s3a://lakehouse/silver/bacen_selic/")
silver_cvm.write.format("delta").mode("overwrite").save("s3a://lakehouse/silver/cvm_if_di/")


In [63]:
silver_cvm.show(20, truncate=False)

+---------+------------------+----------+--------+--------+--------+-------------+----------+----------+---------------+----+---+---+
|CAPTC_DIA|CNPJ_FUNDO        |DT_COMPTC |NR_COTST|RESG_DIA|TP_FUNDO|VL_PATRIM_LIQ|VL_QUOTA  |VL_TOTAL  |cap_liquida_dia|ano |mes|dia|
+---------+------------------+----------+--------+--------+--------+-------------+----------+----------+---------------+----+---+---+
|0.0      |00.017.024/0001-53|2021-01-04|1       |0.0     |FI      |1095773.57   |27.5033358|1097664.87|0.0            |2021|1  |4  |
|0.0      |00.017.024/0001-53|2021-01-05|1       |0.0     |FI      |1095778.31   |27.5034547|1097742.61|0.0            |2021|1  |5  |
|0.0      |00.017.024/0001-53|2021-01-06|1       |0.0     |FI      |1095768.02   |27.5031964|1097837.04|0.0            |2021|1  |6  |
|0.0      |00.017.024/0001-53|2021-01-07|1       |0.0     |FI      |1095774.2    |27.5033516|1097939.54|0.0            |2021|1  |7  |
|0.0      |00.017.024/0001-53|2021-01-08|1       |0.0     |FI 

In [79]:
# Count nulls in CNPJ_FUNDO
null_count = silver_cvm.filter(F.col("CNPJ_FUNDO").isNull()).count()
total_count = silver_cvm.count()
print(f"Nulls in CNPJ_FUNDO: {null_count} of {total_count} ({null_count/total_count:.2%})")

# Show some rows with null CNPJ_FUNDO
silver_cvm.filter(
    (F.col("CNPJ_FUNDO").isNull()) & (F.col("NR_COTST") > 1)
).show(30, truncate=False)

Nulls in CNPJ_FUNDO: 0 of 9137937 (0.00%)
+---------+----------+---------+--------+--------+--------+-------------+--------+--------+---------------+---+---+---+
|CAPTC_DIA|CNPJ_FUNDO|DT_COMPTC|NR_COTST|RESG_DIA|TP_FUNDO|VL_PATRIM_LIQ|VL_QUOTA|VL_TOTAL|cap_liquida_dia|ano|mes|dia|
+---------+----------+---------+--------+--------+--------+-------------+--------+--------+---------------+---+---+---+
+---------+----------+---------+--------+--------+--------+-------------+--------+--------+---------------+---+---+---+



In [69]:
import great_expectations as gx
from great_expectations.core.expectation_configuration import ExpectationConfiguration

context = gx.data_context.DataContext("gx")

# (Re-)register the asset with the current DataFrame
spark_ds = context.get_datasource("spark_datasource")
if "silver_bacen_selic" in [asset.name for asset in spark_ds.assets]:
    spark_ds.delete_asset("silver_bacen_selic")
spark_ds.add_dataframe_asset(name="silver_bacen_selic", dataframe=df_bacen_spark)

# Overwrite the expectation suite with only valid expectations (NO validation run)
suite_name = "silver_bacen_selic_suite"
if suite_name in [suite.expectation_suite_name for suite in context.list_expectation_suites()]:
    context.delete_expectation_suite(suite_name)
suite = context.add_expectation_suite(suite_name)

# Add expectations directly to the suite (no validator, no data access)
suite.expectations = [
    ExpectationConfiguration(
        expectation_type="expect_column_to_exist",
        kwargs={"column": "data"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_to_exist",
        kwargs={"column": "valor"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "data"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "valor"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_of_type",
        kwargs={"column": "data", "type_": "TimestampType"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_of_type",
        kwargs={"column": "valor", "type_": "DoubleType"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={"column": "valor", "min_value": 0}
    ),
    ExpectationConfiguration(
        expectation_type="expect_table_row_count_to_be_between",
        kwargs={"min_value": 1}
    ),
]

context.save_expectation_suite(suite)
print("✅ Clean expectation suite created and saved (no validation run, no validator used).")

✅ Clean expectation suite created and saved (no validation run, no validator used).


In [70]:
import great_expectations as gx
from great_expectations.core.expectation_configuration import ExpectationConfiguration

context = gx.data_context.DataContext("gx")

# (Re-)register the asset with the current DataFrame
spark_ds = context.get_datasource("spark_datasource")
if "silver_cvm_if_di" in [asset.name for asset in spark_ds.assets]:
    spark_ds.delete_asset("silver_cvm_if_di")
spark_ds.add_dataframe_asset(name="silver_cvm_if_di", dataframe=silver_cvm)

# Overwrite the expectation suite with only valid expectations (NO validation run)
suite_name = "silver_cvm_if_di_suite"
if suite_name in [suite.expectation_suite_name for suite in context.list_expectation_suites()]:
    context.delete_expectation_suite(suite_name)
suite = context.add_expectation_suite(suite_name)

# Add expectations directly to the suite (no validator, no data access)
suite.expectations = [
    ExpectationConfiguration(
        expectation_type="expect_column_to_exist",
        kwargs={"column": "DT_COMPTC"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_to_exist",
        kwargs={"column": "CNPJ_FUNDO"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_to_exist",
        kwargs={"column": "VL_QUOTA"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_to_exist",
        kwargs={"column": "VL_PATRIM_LIQ"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_to_exist",
        kwargs={"column": "cap_liquida_dia"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "DT_COMPTC"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "CNPJ_FUNDO"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "VL_QUOTA"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "VL_PATRIM_LIQ"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_of_type",
        kwargs={"column": "DT_COMPTC", "type_": "DateType"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_of_type",
        kwargs={"column": "VL_QUOTA", "type_": "DoubleType"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_of_type",
        kwargs={"column": "VL_PATRIM_LIQ", "type_": "DoubleType"}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={"column": "VL_QUOTA", "min_value": 0}
    ),
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={"column": "VL_PATRIM_LIQ", "min_value": 0}
    ),
    ExpectationConfiguration(
        expectation_type="expect_table_row_count_to_be_between",
        kwargs={"min_value": 1}
    ),
]

context.save_expectation_suite(suite)
print("✅ Clean expectation suite for silver_cvm created and saved (no validation run, no validator used).")

✅ Clean expectation suite for silver_cvm created and saved (no validation run, no validator used).


In [71]:
import great_expectations as gx
from great_expectations.checkpoint import SimpleCheckpoint

context = gx.data_context.DataContext("gx")

# Ensure Spark datasource exists
if "spark_datasource" not in context.datasources:
    context.sources.add_spark(name="spark_datasource")

# (Re-)register the assets with the current DataFrames
spark_ds = context.get_datasource("spark_datasource")
asset_names = [asset.name for asset in spark_ds.assets]
if "silver_bacen_selic" in asset_names:
    spark_ds.delete_asset("silver_bacen_selic")
if "silver_cvm_if_di" in asset_names:
    spark_ds.delete_asset("silver_cvm_if_di")
spark_ds.add_dataframe_asset(name="silver_bacen_selic", dataframe=df_bacen_spark)
spark_ds.add_dataframe_asset(name="silver_cvm_if_di", dataframe=silver_cvm)

# Ensure expectation suites exist
suite_names = [suite.expectation_suite_name for suite in context.list_expectation_suites()]
if "silver_bacen_selic_suite" not in suite_names:
    context.add_expectation_suite("silver_bacen_selic_suite")
if "silver_cvm_if_di_suite" not in suite_names:
    context.add_expectation_suite("silver_cvm_if_di_suite")

# Build batch requests
batch_request_bacen = {
    "datasource_name": "spark_datasource",
    "data_asset_name": "silver_bacen_selic",
}
batch_request_cvm = {
    "datasource_name": "spark_datasource",
    "data_asset_name": "silver_cvm_if_di",
}

# Run the checkpoint for both assets
checkpoint = SimpleCheckpoint(
    name="silver_combined_checkpoint",
    data_context=context,
    validations=[
        {
            "batch_request": batch_request_bacen,
            "expectation_suite_name": "silver_bacen_selic_suite",
        },
        {
            "batch_request": batch_request_cvm,
            "expectation_suite_name": "silver_cvm_if_di_suite",
        }
    ]
)

result = checkpoint.run()
print("✅ Validation Success (bacen & cvm):", result["success"])

Calculating Metrics:   0%|          | 0/22 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/40 [00:00<?, ?it/s]

✅ Validation Success (bacen & cvm): False


In [72]:
for run_result in result["run_results"].values():
    validation_result = run_result["validation_result"]
    for res in validation_result["results"]:
        if not res["success"]:
            print(f"❌ Failed: {res['expectation_config']['expectation_type']} on column {res['expectation_config']['kwargs'].get('column')}")
            print(f"    Details: {res['result']}")

❌ Failed: expect_column_values_to_not_be_null on column CNPJ_FUNDO
    Details: {'element_count': 25957177, 'unexpected_count': 9020632, 'unexpected_percent': 34.75197630312418, 'partial_unexpected_list': [None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None], 'partial_unexpected_counts': [{'value': None, 'count': 20}]}


In [74]:
from pyspark.sql import functions as F


def run_gold_pipeline():

    # Load silver datasets
    # silver_cvm = spark.read.format("delta").load("s3a://lakehouse/silver/cvm/")
    silver_bacen = spark.read.format("delta").load("s3a://lakehouse/silver/bacen_selic/")

    # KPIs CVM Fundos
    gold_cvm = (
        silver_cvm.groupBy("CNPJ_FUNDO")
        .agg(
            F.min("DT_COMPTC").alias("data_inicio"),
            F.max("DT_COMPTC").alias("data_fim"),
            F.first("VL_QUOTA", ignorenulls=True).alias("vl_quota_inicio"),
            F.last("VL_QUOTA", ignorenulls=True).alias("vl_quota_fim"),
            F.sum("cap_liquida_dia").alias("cap_liquida_total"),
            F.first("NR_COTST", ignorenulls=True).alias("cotistas_inicio"),
            F.last("NR_COTST", ignorenulls=True).alias("cotistas_fim")
        )
        .withColumn("rentabilidade_pct", ((F.col("vl_quota_fim") - F.col("vl_quota_inicio")) / F.col("vl_quota_inicio")) * 100)
        .withColumn("crescimento_cotistas", F.col("cotistas_fim") - F.col("cotistas_inicio"))
    )

    # KPIs BACEN indicadores
    gold_bacen = (
        silver_bacen.agg(
            F.min("data").alias("inicio"),
            F.max("data").alias("fim"),
            F.min("valor").alias("min"),
            F.max("valor").alias("max"),
            F.mean("valor").alias("media")
        )
    )


    gold_bacen.write.format("delta").mode("overwrite").save("s3a://lakehouse/gold/bacen_kpis/")
    gold_cvm.write.format("delta").mode("overwrite").save("s3a://lakehouse/gold/cvm_kpis/")
    return gold_bacen, gold_cvm
gold_bacen, gold_cvm = run_gold_pipeline()
gold_bacen.show(20,truncate=False)
gold_cvm.orderBy("data_inicio", ascending=True).show(20,truncate=False)

+-------------------+-------------------+---+-----+------------------+
|inicio             |fim                |min|max  |media             |
+-------------------+-------------------+---+-----+------------------+
|1986-06-04 00:00:00|2025-04-30 00:00:00|0.0|3.626|0.2555755175348646|
+-------------------+-------------------+---+-----+------------------+

+------------------+-----------+----------+---------------+-------------+---------------------+---------------+------------+------------------+--------------------+
|CNPJ_FUNDO        |data_inicio|data_fim  |vl_quota_inicio|vl_quota_fim |cap_liquida_total    |cotistas_inicio|cotistas_fim|rentabilidade_pct |crescimento_cotistas|
+------------------+-----------+----------+---------------+-------------+---------------------+---------------+------------+------------------+--------------------+
|00.360.293/0001-18|2021-01-04 |2023-11-30|30.501901      |27.87017     |-1.8322185264E8      |12826          |2984        |-8.628088459142264|-9842 