In [None]:
import glob
import os

import pandas as pd
import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from dask.diagnostics import ProgressBar
ProgressBar().register()

from timeit import default_timer as timer
start = timer()

In [None]:
# considerando que as as bases de 2005 até 2020 são fixas
# vamos criar um único dataframe com esse período
df = dd.read_csv('csv/2005_2020/inf_diario_fi_*.csv', sep=";")
df

In [None]:
# Faz o tratamento das colunas
df = dd.read_csv('csv/inf_diario_fi_*.csv', sep=";")

# transforma o campo DT_COMPTC
df['CNPJ_FUNDO'] = df['CNPJ_FUNDO'].str.replace('.', '')
df['CNPJ_FUNDO'] = df['CNPJ_FUNDO'].str.replace('/', '')
df['CNPJ_FUNDO'] = df['CNPJ_FUNDO'].str.replace('-', '')
df['CNPJ_FUNDO'] = df['CNPJ_FUNDO'].str.zfill(14)

# cria o campo CO_PRD
df['CO_PRD'] = df['CNPJ_FUNDO']

# cria e formata o campo DT_REF, com a data de referência
print('Formatando campo DT_REF')
df.assign(DT_COMPTC=dd.to_datetime(df['DT_COMPTC'], format='%Y-%m-%d', errors='coerce'))
df['DT_REF'] = df['DT_COMPTC']
df.tail()

In [4]:
# faz o sort do df
df = df.sort_values(by=['CO_PRD', 'DT_REF'])
df.tail()

[########################################] | 100% Completed | 16.18 s
[########################################] | 100% Completed | 10.31 s


Unnamed: 0,TP_FUNDO,CNPJ_FUNDO,DT_COMPTC,VL_TOTAL,VL_QUOTA,VL_PATRIM_LIQ,CAPTC_DIA,RESG_DIA,NR_COTST,CO_PRD,DT_REF
411582,FI,97929213000134,2024-01-17,86999223.09,11.602655,83540214.19,0.0,0.0,2,97929213000134,2024-01-17
411583,FI,97929213000134,2024-01-18,86999223.09,11.602655,83540214.19,0.0,0.0,2,97929213000134,2024-01-18
411584,FI,97929213000134,2024-01-19,86999223.09,11.602655,83540214.19,0.0,0.0,2,97929213000134,2024-01-19
411585,FI,97929213000134,2024-01-22,86536084.41,11.537926,82933867.36,0.0,0.0,2,97929213000134,2024-01-22
411586,FI,97929213000134,2024-01-23,86741878.92,11.56653,83280108.87,0.0,0.0,2,97929213000134,2024-01-23


In [5]:
# Convert to a Pandas DataFrame because dask was being slow with the select logic below
print('Converte dataframe dask para pandas')
df = df.compute() 

# cria uma nova coluna com o percentual de resgate para o dia
print('Calculando campo PC_RESG')
df['PC_RESG'] = df['RESG_DIA'] / df.groupby(['CO_PRD'])['VL_PATRIM_LIQ'].shift(1)

# Convert back to a Dask dataframe because we want that juicy parallelism
print('Converte dataframe pandas para dask')
df = dd.from_pandas(df, npartitions=4)

df.tail()

Converte dataframe dask para pandas
[########################################] | 100% Completed | 12.21 s
Calculando campo PC_RESG
Converte dataframe pandas para dask
[########################################] | 100% Completed | 104.25 ms


Unnamed: 0,TP_FUNDO,CNPJ_FUNDO,DT_COMPTC,VL_TOTAL,VL_QUOTA,VL_PATRIM_LIQ,CAPTC_DIA,RESG_DIA,NR_COTST,CO_PRD,DT_REF,PC_RESG
578707,FI,97929213000134,2023-08-25,84412931.79,11.107729,84411205.2,0.0,0.0,2,97929213000134,2023-08-25,0.0
578708,FI,97929213000134,2023-08-28,84459070.59,11.113726,84456776.59,0.0,0.0,2,97929213000134,2023-08-28,0.0
578709,FI,97929213000134,2023-08-29,84695430.03,11.144754,84692568.4,0.0,0.0,2,97929213000134,2023-08-29,0.0
578710,FI,97929213000134,2023-08-30,84718705.72,11.147742,84715275.32,0.0,0.0,2,97929213000134,2023-08-30,0.0
578711,FI,97929213000134,2023-08-31,84559196.05,11.126677,84555196.78,0.0,0.0,2,97929213000134,2023-08-31,0.0


In [6]:
df['TP_FUNDO'] = None
df = df[['TP_FUNDO', 'CNPJ_FUNDO', 'DT_COMPTC', 'VL_TOTAL', 'VL_QUOTA',
       'VL_PATRIM_LIQ', 'CAPTC_DIA', 'RESG_DIA', 'NR_COTST']]

# salva num único csv
df.to_csv('2005_2020.dask', sep=";", index=False)
print('csv de saída salvo com sucesso')

end = timer()
print(end - start)

[########################################] | 100% Completed | 39.14 ss
csv de saída salvo com sucesso
85.84465033400011
