In [1]:
!pip install boto3

Collecting boto3
  Downloading boto3-1.40.5-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore<1.41.0,>=1.40.5 (from boto3)
  Downloading botocore-1.40.5-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.14.0,>=0.13.0 (from boto3)
  Downloading s3transfer-0.13.1-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.40.5-py3-none-any.whl (140 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m140.1/140.1 kB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.40.5-py3-none-any.whl (14.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.0/14.0 MB[0m [31m24.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Downloading s3transfer-0.13.1-py3-none-any.whl (85 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.3/85.3 kB[0m [31m8.7 MB/s[0

In [77]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window as w
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
import boto3

# 1 - configurando


In [5]:

spark = SparkSession.builder \
    .appName("Teste PySpark") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4",) \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "1")\
    .getOrCreate()

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", "http://localstack:4566")
hadoop_conf.set("fs.s3a.access.key", "test")
hadoop_conf.set("fs.s3a.secret.key", "test")
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.committer.name", "directory")


# 2 - lendo arquivos do bucket e unificando

In [106]:


columns = ["TP_FUNDO_CLASSE", "CNPJ_FUNDO_CLASSE", "DT_COMPTC", "VL_QUOTA","VL_PATRIM_LIQ",'CAPTC_DIA','RESG_DIA','NR_COTST']
map_col = {
    "TP_FUNDO":'TP_FUNDO_CLASSE',
    "CNPJ_FUNDO":"CNPJ_FUNDO_CLASSE"
    }

def padronizar_colunas(df, mapa):
    for col in df.columns:
        if col in mapa:
            df = df.withColumnRenamed(col, mapa[col])
    return df

try:
    df = spark.read \
        .option("header", "true") \
        .option("encoding", "latin1") \
        .option("sep", ";") \
        .option("inferSchema", "false")\
        .csv("s3a://s3-cvm-fii/raw/*.csv")\
        .select(*colunas_desejadas)
    df = padronizar_colunas(df, mapa_colunas)
    print('ok \u2705')
    
except Exception as e:
    print(f'\u270c{e}')

ok ✅


# 3 - tratando 

In [62]:
df.count()

22519897

In [63]:
df.select([
    f.count(f.when(f.col(c).isNull(), c)).alias(c)
    for c in df.columns
]).show()


+---------------+-----------------+---------+--------+-------------+---------+--------+--------+
|TP_FUNDO_CLASSE|CNPJ_FUNDO_CLASSE|DT_COMPTC|VL_QUOTA|VL_PATRIM_LIQ|CAPTC_DIA|RESG_DIA|NR_COTST|
+---------------+-----------------+---------+--------+-------------+---------+--------+--------+
|              0|                0|        0|       0|            0|        0|       0|11691182|
+---------------+-----------------+---------+--------+-------------+---------+--------+--------+



In [64]:
df.limit(10).show()

+---------------+------------------+----------+---------------+-------------+---------+--------+--------+
|TP_FUNDO_CLASSE| CNPJ_FUNDO_CLASSE| DT_COMPTC|       VL_QUOTA|VL_PATRIM_LIQ|CAPTC_DIA|RESG_DIA|NR_COTST|
+---------------+------------------+----------+---------------+-------------+---------+--------+--------+
|  CLASSES - FIF|00.017.024/0001-53|2025-07-01|39.213324800000|   1146691.57|     0.00|    0.00|       1|
|  CLASSES - FIF|00.017.024/0001-53|2025-07-02|39.232973700000|   1147266.15|     0.00|    0.00|       1|
|  CLASSES - FIF|00.017.024/0001-53|2025-07-03|39.252822600000|   1147846.58|     0.00|    0.00|       1|
|  CLASSES - FIF|00.017.024/0001-53|2025-07-04|39.271309800000|   1148387.19|     0.00|    0.00|       1|
|  CLASSES - FIF|00.017.024/0001-53|2025-07-07|39.289262100000|   1145782.62|     0.00| 3129.54|       1|
|  CLASSES - FIF|00.017.024/0001-53|2025-07-08|39.307480200000|   1146313.91|     0.00|    0.00|       1|
|  CLASSES - FIF|00.017.024/0001-53|2025-07-09

In [65]:
df.printSchema()

root
 |-- TP_FUNDO_CLASSE: string (nullable = true)
 |-- CNPJ_FUNDO_CLASSE: string (nullable = true)
 |-- DT_COMPTC: string (nullable = true)
 |-- VL_QUOTA: string (nullable = true)
 |-- VL_PATRIM_LIQ: string (nullable = true)
 |-- CAPTC_DIA: string (nullable = true)
 |-- RESG_DIA: string (nullable = true)
 |-- NR_COTST: string (nullable = true)



In [115]:
df_test = df.limit(100)

In [111]:
df = (
    df
    .withColumn('CNPJ_FUNDO_CLASSE',f.regexp_replace(f.col('CNPJ_FUNDO_CLASSE'), r'[./-]', ''))
    .withColumn('DT_COMPTC',f.col('DT_COMPTC').cast(DateType()))
    .withColumn('VL_QUOTA',f.round(f.col('VL_QUOTA').cast(DoubleType()),2))
    .withColumn('VL_PATRIM_LIQ',f.round(f.col('VL_PATRIM_LIQ').cast(DoubleType()),2))
    .withColumn('CAPTC_DIA',f.round(f.col('CAPTC_DIA').cast(DoubleType()),2))
    .withColumn('RESG_DIA',f.round(f.col('RESG_DIA').cast(DoubleType()),2))
    .withColumn('NR_COTST', f.col('NR_COTST').cast(IntegerType()))
    .withColumn('ano',f.year(f.col('DT_COMPTC')))
    .withColumn('id_fund_date',f.concat(f.col('CNPJ_FUNDO_CLASSE'),f.date_format(f.col('DT_COMPTC'), 'yyyyMMdd')))
    .drop('TP_FUNDO_CLASSE')
    .select(
        f.col('id_fund_date'),
        f.col('CNPJ_FUNDO_CLASSE').alias('cnpj_fundo'),
        f.col('RESG_DIA').alias('valor_resgates'),
        f.col('CAPTC_DIA').alias('valor_aplicacoes'),
        f.col('VL_QUOTA').alias('cota'),
        f.col('VL_PATRIM_LIQ').alias('pl_fundo'),
        f.col('DT_COMPTC').alias('data_referencia'),
        f.col('NR_COTST').alias('qtd_cotistas'),
        f.col('ano'),
        f.current_date().alias('dt_ingest')
    )
).drop_duplicates('id_fund_date')

df = .whritrow_number

In [112]:
df.count()

22519897

In [None]:
df.select([
    f.count(f.when(f.col(c).isNull(), 1)).alias(c)
    for c in df.columns
]).show()

In [76]:
df_test.printSchema()

root
 |-- id_fund_date: string (nullable = true)
 |-- cnpj_fundo: string (nullable = true)
 |-- valor_resgates: double (nullable = true)
 |-- valor_aplicacoes: double (nullable = true)
 |-- cota: double (nullable = true)
 |-- pl_fundo: double (nullable = true)
 |-- data_referencia: date (nullable = true)
 |-- qtd_cotistas: integer (nullable = true)
 |-- ano: integer (nullable = true)
 |-- dt_ingest: date (nullable = false)



# 4 - Calculo metricas 

In [116]:
df_metricas_ = df_test.select('id_fund_date','cnpj_fundo','valor_resgates','valor_aplicacoes','cota','pl_fundo','data_referencia','ano')

In [117]:
df_metricas_.show(truncate=22)

+----------------------+--------------+--------------+----------------+-----+----------+---------------+----+
|          id_fund_date|    cnpj_fundo|valor_resgates|valor_aplicacoes| cota|  pl_fundo|data_referencia| ano|
+----------------------+--------------+--------------+----------------+-----+----------+---------------+----+
|0001702400015320250701|00017024000153|           0.0|             0.0|39.21|1146691.57|     2025-07-01|2025|
|0001702400015320250702|00017024000153|           0.0|             0.0|39.23|1147266.15|     2025-07-02|2025|
|0001702400015320250703|00017024000153|           0.0|             0.0|39.25|1147846.58|     2025-07-03|2025|
|0001702400015320250704|00017024000153|           0.0|             0.0|39.27|1148387.19|     2025-07-04|2025|
|0001702400015320250707|00017024000153|       3129.54|             0.0|39.29|1145782.62|     2025-07-07|2025|
|0001702400015320250708|00017024000153|           0.0|             0.0|39.31|1146313.91|     2025-07-08|2025|
|000170240

In [118]:
janela_fundo = w.partitionBy(f.col('cnpj_fundo')).orderBy('data_referencia')

df_metricas_ = (
    df_metricas_
    .withColumn('cota_anterior',f.lag('cota').over(janela_fundo))
    .withColumn('pct_rentabilidade_diaria', f.round(((f.col('cota') / f.col('cota_anterior'))-1)*100,4))
    .withColumn('net', f.col('valor_aplicacoes') - f.col('valor_resgates'))
    .withColumn('pl_anterior', f.lag('pl_fundo').over(janela_fundo))
    .withColumn('pnl', f.round(f.col('pl_fundo') - f.col('pl_anterior') - f.col('net'),4))
    .withColumn('dt_ingest',f.current_date())
    .drop('cota_anterior','pl_anterior')
    )

In [119]:
df_metricas_.show()

+--------------------+--------------+--------------+----------------+-----+----------+---------------+----+------------------------+--------+------+----------+
|        id_fund_date|    cnpj_fundo|valor_resgates|valor_aplicacoes| cota|  pl_fundo|data_referencia| ano|pct_rentabilidade_diaria|     net|   pnl| dt_ingest|
+--------------------+--------------+--------------+----------------+-----+----------+---------------+----+------------------------+--------+------+----------+
|00017024000153202...|00017024000153|           0.0|             0.0|39.21|1146691.57|     2025-07-01|2025|                    NULL|     0.0|  NULL|2025-08-09|
|00017024000153202...|00017024000153|           0.0|             0.0|39.23|1147266.15|     2025-07-02|2025|                   0.051|     0.0|574.58|2025-08-09|
|00017024000153202...|00017024000153|           0.0|             0.0|39.25|1147846.58|     2025-07-03|2025|                   0.051|     0.0|580.43|2025-08-09|
|00017024000153202...|00017024000153|   

In [124]:
df.select("id_fund_date").distinct().count()


Py4JJavaError: An error occurred while calling o3307.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 159.0 failed 4 times, most recent failure: Lost task 1.3 in stage 159.0 (TID 1506) (172.19.0.5 executor 21): ExecutorLostFailure (executor 21 exited caused by one of the running tasks) Reason: Command exited with code 137
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)


In [55]:
df_test.select([
    f.count(f.when(f.col(c).isNull(), 1)).alias(c)
    for c in df_test.columns
]).show()

+----------+--------------+----------------+----+--------+---------------+------------+---+---------+
|cnpj_fundo|valor_resgates|valor_aplicacoes|cota|pl_fundo|data_referencia|qtd_cotistas|ano|dt_ingest|
+----------+--------------+----------------+----+--------+---------------+------------+---+---------+
|         0|             0|               0|   0|       0|              0|           0|  0|        0|
+----------+--------------+----------------+----+--------+---------------+------------+---+---------+



In [2]:

df = (df
    .withColumn('CNPJ_FUNDO_CLASSE',f.regexp_replace(f.col('CNPJ_FUNDO_CLASSE'), r'[./-]', ''))
    .filter(f.col('TP_FUNDO_CLASSE')=='FI')
    .withColumn('ano',f.year(f.col('DT_COMPTC')))
    .select(
        f.col('CNPJ_FUNDO_CLASSE').alias('cnpj_fundo'),
        f.col('NR_COTST').alias('qtd_cotistas'),
        f.col('RESG_DIA').alias('valor_resgates'),
        f.col('CAPTC_DIA').alias('valor_aplicacoes'),
        f.col('VL_QUOTA').alias('cota'),
        f.col('VL_TOTAL').alias('valor_carteira'),
        f.col('VL_PATRIM_LIQ').alias('pl_fundo'),
        f.col('DT_COMPTC').alias('data_referencia'),
        f.col('ano'),
        f.current_date().alias('dt_ingest'))
)

df_25 = df.filter(f.col("ano") == 2025)


ok ✅
✅ [DADOS SALVOS COM SUCESSO EM s3a://s3-cvm-fii/s3a://s3-cvm-fii/stage-test2/]


# 5 - upload

In [None]:

try:
    df_25.write.mode("overwrite").parquet("s3a://s3-cvm-fii/stage-test2/")
    print("\u2705 [DADOS SALVOS COM SUCESSO EM s3a://s3-cvm-fii/s3a://s3-cvm-fii/stage-test2/]")
except Exception as e:
    print(e)

In [8]:
print('\u2705')

✅


# 6 - CONSULTANDO NO STAGE e LENDO PARQUET

In [16]:
s3 = boto3.client(
    "s3",
    endpoint_url="http://localstack:4566",
    aws_access_key_id="test",
    aws_secret_access_key="test",
    region_name="us-east-1"
)

bucket_name = "s3-cvm-fii"
prefix = "stage-test2/"

response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

files = [f"s3a://{bucket_name}/{obj['Key']}" for obj in response.get("Contents", []) if obj['Key'].endswith('.csv')]

for path in files:
    print(path)
print('ok \u2705')


ok ✅


In [21]:

try:
    df_parquet = spark.read.parquet("s3a://s3-cvm-fii/stage-test2/*.parquet")
    print('ok \u2705')
except Exception as e:
    print(f'\u240c{e}')

ok ✅


In [22]:
df_parquet.show()

+--------------+------------+--------------+----------------+----------+--------------+----------+---------------+----+----------+
|    cnpj_fundo|qtd_cotistas|valor_resgates|valor_aplicacoes|      cota|valor_carteira|  pl_fundo|data_referencia| ano| dt_ingest|
+--------------+------------+--------------+----------------+----------+--------------+----------+---------------+----+----------+
|00017024000153|        NULL|           0.0|             0.0|37.2288138|           1.0|1151690.99|     2025-01-02|2025|2025-08-03|
|00017024000153|        NULL|           0.0|             0.0| 37.247108|           1.0|1152256.93|     2025-01-03|2025|2025-08-03|
|00017024000153|        NULL|           0.0|             0.0|37.2637482|           1.0| 1152771.7|     2025-01-06|2025|2025-08-03|
|00017024000153|        NULL|           0.0|             0.0|37.2827358|           1.0|1153359.09|     2025-01-07|2025|2025-08-03|
|00017024000153|        NULL|       3539.35|             0.0|37.2986844|           

In [23]:
df_parquet.columns

['cnpj_fundo',
 'qtd_cotistas',
 'valor_resgates',
 'valor_aplicacoes',
 'cota',
 'valor_carteira',
 'pl_fundo',
 'data_referencia',
 'ano',
 'dt_ingest']

In [24]:
df_parquet.printSchema()

root
 |-- cnpj_fundo: string (nullable = true)
 |-- qtd_cotistas: integer (nullable = true)
 |-- valor_resgates: double (nullable = true)
 |-- valor_aplicacoes: double (nullable = true)
 |-- cota: double (nullable = true)
 |-- valor_carteira: double (nullable = true)
 |-- pl_fundo: double (nullable = true)
 |-- data_referencia: date (nullable = true)
 |-- ano: integer (nullable = true)
 |-- dt_ingest: date (nullable = true)



In [None]:
def test(stage_tables:list=None):
    paths = {
    "metricas": "/stage/metricas/",
    "fundos": "/stage/fundos/"
    }

    if stage_tables == None:
        for s in list(paths.keys()):
            path_stage = paths[s]
            print(path_stage)

    else:
        for s in stage_tables:
            path_stage = paths[s]
            print(path_stage)


In [13]:
test(['fundos'])

/stage/fundos/


In [7]:
lista_tabelas = ['metricas','fundos']

In [8]:
lista_tabelas[0]

'metricas'

In [1]:
dfs = {
    "fundos": 1,
    "metricas": 2
    }


In [6]:

for table_name, df in dfs.items():
    print(df)

1
2


In [None]:
# df_ranked =(df
#              .withColumn('rank', row_number().over(w.partitionBy('id_fundo').orderBy('data')))
#              .withColumn(
#                         "var_95",
#                         when(col("rank") > 252, lit("calcular_aqui")) 
#                         .otherwise(lit(None)) )
# )

# calcule o var

In [None]:


# ## variação cota dia
# df_fi = (
#     df_fi.withColumn('cota_dia_anterior',
#                          f.lag(f.col('cota')).over(W.partitionBy(f.col('cnpj_fundo'))
#                                                      .orderBy(f.col('data_referencia'))))
#         .withColumn("variacao_cota_dia",
#             f.when(
#                    (f.col("cota_dia_anterior").isNotNull()) & (f.col("cota_dia_anterior") != 0),
#                     f.round(((f.col("cota") - f.col("cota_dia_anterior")) / f.col("cota_dia_anterior")) * 100,4)))
#         .withColumn("ano", f.year(f.col("data_referencia")))
#         .withColumn("mes",f.month(f.col("data_referencia")))
#         .withColumn("net",
#                    f.col("valor_aplicacoes") - f.col("valor_resgates"))
#         .withColumn("pl_d1",
#                     f.lag(f.col("pl_fundo")).over(W.partitionBy(f.col("cnpj_fundo"))
#                                                     .orderBy(f.col("data_referencia")))
#                     )
#         .withColumn('pnl',f.col('pl_fundo') - f.col('pl_d1') - f.col('net'))
#         .withColumn("dt_ingest", f.current_date())
#         .select(
#              'cnpj_fundo',
#              'pl_fundo',
#              'cota',
#              'qtd_cotistas',
#              'valor_aplicacoes',
#              'valor_resgates',
#              'net',
#              'pnl',
#              'valor_carteira',
#              'data_referencia',
#              'variacao_cota_dia',
#              'data_referencia',
#              'mes',
#              'ano',
#             'dt_ingest')
             
# ).orderBy('data_referencia')

# BACKUP

In [None]:
df = (df
    .filter(f.col('TP_FUNDO_CLASSE')=='FI')
    .filter(f.abs(f.col('VL_PATRIM_LIQ')) < 1e24)
    .filter(f.abs(f.col('VL_QUOTA')) < 1e12)
    .drop('VL_TOTAL')
    .withColumn('CNPJ_FUNDO_CLASSE',f.regexp_replace(f.col('CNPJ_FUNDO_CLASSE'), r'[./-]', ''))
    .withColumn('ano',f.year(f.col('DT_COMPTC')))
    .withColumn('pl_fundo', f.round(f.col('VL_PATRIM_LIQ'), 4))
    .withColumn('cota', f.round(f.col('VL_QUOTA'), 4))
    .withColumn('valor_resgates', f.round(f.col('RESG_DIA'), 4))
    .withColumn('valor_aplicacoes', f.round(f.col('CAPTC_DIA'), 4))
    .select(
        f.col('CNPJ_FUNDO_CLASSE').alias('cnpj_fundo'),
        f.col('NR_COTST').alias('qtd_cotistas'),
        f.col('valor_resgates'),
        f.col('valor_aplicacoes'),
        f.col('cota'),
        f.col('pl_fundo'),
        f.col('DT_COMPTC').alias('data_referencia'),
        f.col('ano'),
        f.current_date().alias('dt_ingest'))
)