In [None]:
import requests
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import dask.dataframe as dd
from tqdm import trange, tqdm
import sqlalchemy as sa
import zipfile
import os

In [None]:
# função para aplicar o paralelismo
def mapp(fn,data, workers=8):
  tasks=[]
  with ThreadPoolExecutor(max_workers=workers) as tex:
    tasks=[tex.submit(fn,i) for i in data]
  for task in as_completed(tasks):
    print(task.result())

In [None]:
def downloader(url):
    if dirpath== None:
        dirpath="downloads/"
    try:
        r = requests.get(url, stream=True)
        total = int(r.headers.get('content-length', 0))
        if "Content-Disposition" in r.headers.keys():
            fname = re.findall("filename=(.+)", r.headers["Content-Disposition"])[0]
        else:
            fname = url.split("/")[-1]
        with open(dirpath+fname, 'wb') as file, tqdm(desc=fname, total=total, unit='iB', unit_scale=True, unit_divisor=1024) as bar:
            for data in r.iter_content(chunk_size=1024):
                size = file.write(data)
                bar.update(size)
    finally:
        return str(r.status_code) + " " + url

In [None]:
def upload(engine, local, table, colunas):
    with engine.connect() as c:
        c.execute("DROP TABLE IF EXISTS \"" + table + "\"")
    df = dd.read_csv(local, header=None, encoding='latin1', sep=";", names=colunas, dtype=str)
    for n in trange(df.npartitions, desc=table):
        df.get_partition(n).compute().to_sql( name=table, con=engine, if_exists= 'append', chunksize=1000, index=False)
    # with engine.connect() as c:
    #     c.execute("CREATE INDEX IF NOT EXISTS idx_" + table + " ON \"" + table + "\" (\"" + df.columns[0] + "\")")

In [None]:
urlbase = "https://dadosabertos.rfb.gov.br/CNPJ/"
filelist = ["Cnaes.zip","cnpj-metadados.pdf","Paises.zip","Qualificacoes.zip","Simples.zip",
"Empresas0.zip","Empresas1.zip","Empresas2.zip","Empresas3.zip","Empresas4.zip",
"Empresas5.zip","Empresas6.zip","Empresas7.zip","Empresas8.zip","Empresas9.zip",
"Estabelecimentos0.zip","Estabelecimentos1.zip","Estabelecimentos2.zip",
"Estabelecimentos3.zip","Estabelecimentos4.zip","Estabelecimentos5.zip",
"Estabelecimentos6.zip","Estabelecimentos7.zip","Estabelecimentos8.zip",
"Estabelecimentos9.zip","Motivos.zip","Municipios.zip","Naturezas.zip",
"Socios0.zip","Socios1.zip","Socios2.zip","Socios3.zip","Socios4.zip","Socios5.zip",
"Socios6.zip","Socios7.zip","Socios8.zip","Socios9.zip"]
urllist = [urlbase+f for f in filelist]

In [None]:
# configuacoes
dirpath= "downloads/"
dirlist= os.listdir(dirpath)

#uri_postgres = 'postgresql+psycopg2://postgres:postgres@192.168.15.2/cnpj'
uri_sqlite ='sqlite:///base_receita202211.db'
engine = sa.create_engine(uri_sqlite)
engine.connect().close() #test

In [None]:
mapp(downloader,urllist)

In [None]:
# para testar, tentar corrigir erros nos zip baixados e extrair
for filename in tqdm(dirlist):
    if not filename.find(".zip"):
        if not zipfile.is_zipfile(dirpath+filename):
            os.system('zip -FF '+dirpath+filename)
            print("Erro localizado em " + filename)
        zipfile.ZipFile(dirpath+filename).extractall(path=dirpath)

In [None]:
local=dirpath+'*.MUNICCSV'
colunas=["CD_MUNICIPIO","MUNICIPIO"]
table='MUNICIPIO'
upload(engine, local, table, colunas)

colunas=["CNAE","NM_CNAE"]
local=dirpath+'*.CNAECSV'
table='CNAE'
upload(engine, local, table, colunas)

colunas=["CD_PAIS","PAIS"]
local=dirpath+'*.PAISCSV'
table='PAIS'
upload(engine, local, table, colunas)

colunas=["CD_NAT_JURIDICA","NAT_JURIDICA"]
local=dirpath+'*.NATJUCSV'
table='NATJU'
upload(engine, local, table, colunas)

colunas=["CD_MOTIVO_SIT_CADASTRO","MOTIVO_SIT_CADASTRO"]
local=dirpath+'*.MOTICSV'
table='MOTIVOSIT'
upload(engine, local, table, colunas)

local=dirpath+'*.QUALSCSV'
colunas=["CD_QUALS","NM_QUALS"]
table='QUALS'
upload(engine, local, table, colunas)

#simples
colunas=["CNPJ_BASICO","OP_SIMPLES","DT_OP_SIMPLES","DT_EXC_SIMPLES","OP_MEI","DT_OP_MEI","DT_EXC_MEI"]
local=dirpath+'*.SIMPLES.CSV.*'
table='SIMPLES'
upload(engine, local, table, colunas)

#empresa
colunas = ["CNPJ_BASICO", "RAZAO_SOCIAL", "NAT_JURIDICA", "QUAL_RESP", "CAPITAL_SOCIAL", "PORTE_EMPRESA", "ENTE_FED_RESP"]
local=dirpath+'*.EMPRECSV'
table='EMPRESA'
upload(engine, local, table, colunas)

#socios
colunas=["CNPJ_BASICO","ID_TIPO_SOCIO","NOME_OU_RAZAO_SOCIAL","CNPJ_CPF","QUALIF_SOCIO",
         "DT_ENTRADA","CD_PAIS","REPR_LEGAL","NM_REPR","CD_QUALIF_REPR","FAIXA_ETARIA"]
local=dirpath+'*.SOCIOCSV'
table='SOCIO'
upload(engine, local, table, colunas)

#estabelecimentos
colunas=["CNPJ_BASICO", "CNPJ_ORDEM", "CNPJ_DV", "CD_MATRIZ_FILIAL", "NM_FANTASIA",
         "CD_SIT_CADASTRO", "DT_SIT_CADASTRO","MOTIVO_SIT_CADASTRO","NM_CIDADE_EXT", "CD_PAIS",
         "DT_INI", "CNAE_PRINCIPAL","CNAE_SECUNDARIO",
         "TIP_LOGRADOURO","LOGRADOURO","NUMERO","COMPLEMENTO","BAIRRO","CEP","UF","MUNICIPIO",
         "DDD_1","TEL_1","DDD_2","TEL_2","DDD_FAX","FAX","EMAIL","SIT_ESP","DT_SIT_ESP"]
local=dirpath+'*.ESTABELE'
table='ESTABELE'
upload(engine, local, table, colunas)

In [None]:
#apagar arquivos
os.system("del " + dirpath+"*.*")