### Ingestão e tratamento dos dados da dimensão fundos

#### 1. Camada Bronze – salvar o CSV original

In [25]:
import requests
import os

# URL do CSV de cadastro de fundos
url = "https://dados.cvm.gov.br/dados/FI/CAD/DADOS/cad_fi.csv"

# Caminho da pasta bronze
caminho_bronze = "/lakehouse/default/Files/bronze/cad_fi.csv"

# Faz o download do arquivo CSV
res = requests.get(url)

# Cria o diretório se não existir
os.makedirs(os.path.dirname(caminho_bronze), exist_ok=True)

# Salva o conteúdo na pasta bronze
with open(caminho_bronze, "wb") as f:
    f.write(res.content)

StatementMeta(, 5f56878a-8e51-4799-a12a-48fbeb99ad56, 27, Finished, Available, Finished)

#### 2. Camada Silver – limpeza e transformação

In [1]:
from pyspark.sql.functions import col, to_date, current_date
from pyspark.sql.types import DoubleType, IntegerType, DateType

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 3, Finished, Available, Finished)

In [2]:
caminho_bronze = "Files/bronze/cad_fi.csv"

df_cadastro_raw = spark.read.option("header", True)\
    .option("sep", ";")\
    .option("encoding", "latin1")\
    .csv(caminho_bronze)

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 4, Finished, Available, Finished)

In [4]:
display(df_cadastro_raw)

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 649e332b-b714-4d93-bb54-d3b500198e6e)

In [7]:
# Dicionário de renomeação
renomear = {
    "DENOM_SOCIAL": "NOME_FUNDO",
    "TP_FUNDO": "CATEGORIA",
    "DT_REG": "DT_REGISTRO",
    "DT_CONST": "DT_CONSTITUICAO",
    # adicione outras se quiser
}

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 9, Finished, Available, Finished)

In [8]:
# Aplica renomeações
df_renomeado = df_cadastro_raw
for original, novo in renomear.items():
    df_renomeado = df_renomeado.withColumnRenamed(original, novo)

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 10, Finished, Available, Finished)

In [9]:
display(df_renomeado)

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b66e6e25-af0a-41cf-99d0-a96cef75f07f)

In [10]:
# Conversão de tipos
df_silver = df_renomeado\
    .withColumn("DT_REGISTRO", col("DT_REGISTRO").cast(DateType()))\
    .withColumn("DT_CONSTITUICAO", col("DT_CONSTITUICAO").cast(DateType()))\
    .withColumn("DT_CANCEL", col("DT_CANCEL").cast(DateType()))\
    .withColumn("DT_INI_SIT", col("DT_INI_SIT").cast(DateType()))\
    .withColumn("DT_INI_ATIV", col("DT_INI_ATIV").cast(DateType()))\
    .withColumn("DT_FIM_EXERC", col("DT_FIM_EXERC").cast(DateType()))\
    .withColumn("DT_INI_EXERC", col("DT_INI_EXERC").cast(DateType()))\
    .withColumn("DT_INI_CLASSE", col("DT_INI_CLASSE").cast(DateType()))\
    .withColumn("VL_PATRIM_LIQ", col("VL_PATRIM_LIQ").cast(DoubleType()))\
    .withColumn("DT_CARGA", current_date())

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 12, Finished, Available, Finished)

In [11]:
df_silver.printSchema()

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 13, Finished, Available, Finished)

root
 |-- CATEGORIA: string (nullable = true)
 |-- CNPJ_FUNDO: string (nullable = true)
 |-- NOME_FUNDO: string (nullable = true)
 |-- DT_REGISTRO: date (nullable = true)
 |-- DT_CONSTITUICAO: date (nullable = true)
 |-- CD_CVM: string (nullable = true)
 |-- DT_CANCEL: date (nullable = true)
 |-- SIT: string (nullable = true)
 |-- DT_INI_SIT: date (nullable = true)
 |-- DT_INI_ATIV: date (nullable = true)
 |-- DT_INI_EXERC: date (nullable = true)
 |-- DT_FIM_EXERC: date (nullable = true)
 |-- CLASSE: string (nullable = true)
 |-- DT_INI_CLASSE: date (nullable = true)
 |-- RENTAB_FUNDO: string (nullable = true)
 |-- CONDOM: string (nullable = true)
 |-- FUNDO_COTAS: string (nullable = true)
 |-- FUNDO_EXCLUSIVO: string (nullable = true)
 |-- TRIB_LPRAZO: string (nullable = true)
 |-- PUBLICO_ALVO: string (nullable = true)
 |-- ENTID_INVEST: string (nullable = true)
 |-- TAXA_PERFM: string (nullable = true)
 |-- INF_TAXA_PERFM: string (nullable = true)
 |-- TAXA_ADM: string (nullable = t

In [12]:
# Salva camada silver
df_silver.write.mode("overwrite")\
    .format("parquet")\
    .save("Files/silver/cad_fi_limpo")

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 14, Finished, Available, Finished)

#### 3. Camada Gold – modelagem para Dim_Fundo

In [13]:
from pyspark.sql.functions import col

dim_fundo = df_silver.select(
    col("CNPJ_FUNDO"),
    col("NOME_FUNDO"),
    col("CATEGORIA"),
    col("GESTOR"),
    col("DT_INI_EXERC"),
    col("DT_FIM_EXERC")
).dropDuplicates(["CNPJ_FUNDO"])

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 15, Finished, Available, Finished)

In [14]:
display(dim_fundo)

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 30a055d6-11d8-4df4-8bdc-9b6887511769)

In [16]:
# Salva na camada gold
dim_fundo.write.mode("overwrite")\
    .option("overwriteSchema", "true")\
    .format("delta")\
    .save("Files/gold/dim_fundo")

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 18, Finished, Available, Finished)

In [18]:
# Salvar como tabela na camada gold do Lakehouse
dim_fundo.write.mode("overwrite")\
.option("overwriteSchema", "true")\
.saveAsTable("Dim_Fundo")

StatementMeta(, 808d7226-bd02-459a-a3ed-b7a726eea9b6, 20, Finished, Available, Finished)