Autor: Jurandi A. França  
email: jurandi82@gmail.com  
Versão: 3.20240907  

# Imports

In [1]:
import requests
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.notebook import tqdm, trange
from bs4 import BeautifulSoup as bs
import os
import sqlalchemy as sa
import zipfile
import dask.dataframe as dd

# Configuração

In [2]:
# Link de download
urlbase = "https://dadosabertos.rfb.gov.br/CNPJ/dados_abertos_cnpj/2024-08/"

# Download

In [3]:
# Pega data do arquivo
soup = bs(requests.get(urlbase).content)
dt = soup.findAll("tr")[3].find_all('td')[2].get_text()[:10]
uri_sqlite ='sqlite:///base_receita_' + dt + '.db'

# Cria a pasta de dowloads se ela não existir
dirpath= "downloads/"
if not os.path.exists(dirpath):
    os.makedirs(dirpath)

In [4]:
# Função para chamadas multiplas
def mapp(fn,data, workers=12):
  tasks=[]
  out = []
  
  with ThreadPoolExecutor(max_workers=workers) as tex:
    tasks=[tex.submit(fn,i) for i in data]
  
  for task in as_completed(tasks):
    out.append(task.result())
  
  return out

In [5]:
def downloader(url):
    try:
        r = requests.get(url, stream=True)
        total = int(r.headers.get('content-length', 0))
    
        if "Content-Disposition" in r.headers.keys():
            fname = re.findall("filename=(.+)",
                        r.headers["Content-Disposition"])[0]
    
        else:
            fname = url.split("/")[-1]
    
        with open(dirpath+fname, 'wb') as file, tqdm(
        desc=fname, total=total, unit='iB', unit_scale=True, unit_divisor=1024) as bar:
            for data in r.iter_content(chunk_size=1024):
                size = file.write(data)
                bar.update(size)
    
    except Exception as inst:
        print(inst)    
    
    finally:
        return [str(r.status_code), url]

baixar = lambda l : "Cancelado" if input("Tem certeza? S/N").upper() != "S" else ( mapp(downloader, l) )

In [6]:
filelist = filter( lambda s: s.find(".")>0,
                        [tag.attrs['href'] for tag in soup.find_all('a')])
urllist = [ (urlbase + f) for f in filelist]

In [8]:
res = baixar(urllist)

Empresas7.zip:   0%|          | 0.00/94.8M [00:00<?, ?iB/s]

Empresas2.zip:   0%|          | 0.00/75.4M [00:00<?, ?iB/s]

Empresas3.zip:   0%|          | 0.00/81.2M [00:00<?, ?iB/s]

Empresas5.zip:   0%|          | 0.00/93.1M [00:00<?, ?iB/s]

Empresas8.zip:   0%|          | 0.00/95.0M [00:00<?, ?iB/s]

Empresas6.zip:   0%|          | 0.00/90.3M [00:00<?, ?iB/s]

Cnaes.zip:   0%|          | 0.00/21.6k [00:00<?, ?iB/s]

Empresas0.zip:   0%|          | 0.00/341M [00:00<?, ?iB/s]

Empresas4.zip:   0%|          | 0.00/86.2M [00:00<?, ?iB/s]

Estabelecimentos1.zip:   0%|          | 0.00/323M [00:00<?, ?iB/s]

Empresas1.zip:   0%|          | 0.00/74.2M [00:00<?, ?iB/s]

Estabelecimentos0.zip:   0%|          | 0.00/1.29G [00:00<?, ?iB/s]

Empresas9.zip:   0%|          | 0.00/90.8M [00:00<?, ?iB/s]

Estabelecimentos2.zip:   0%|          | 0.00/322M [00:00<?, ?iB/s]

Estabelecimentos3.zip:   0%|          | 0.00/321M [00:00<?, ?iB/s]

Estabelecimentos4.zip:   0%|          | 0.00/343M [00:00<?, ?iB/s]

Estabelecimentos5.zip:   0%|          | 0.00/330M [00:00<?, ?iB/s]

Estabelecimentos6.zip:   0%|          | 0.00/322M [00:00<?, ?iB/s]

Estabelecimentos7.zip:   0%|          | 0.00/320M [00:00<?, ?iB/s]

Estabelecimentos8.zip:   0%|          | 0.00/335M [00:00<?, ?iB/s]

Estabelecimentos9.zip:   0%|          | 0.00/339M [00:00<?, ?iB/s]

Motivos.zip:   0%|          | 0.00/1.11k [00:00<?, ?iB/s]

Municipios.zip:   0%|          | 0.00/42.4k [00:00<?, ?iB/s]

Naturezas.zip:   0%|          | 0.00/1.49k [00:00<?, ?iB/s]

Paises.zip:   0%|          | 0.00/2.68k [00:00<?, ?iB/s]

Qualificacoes.zip:   0%|          | 0.00/980 [00:00<?, ?iB/s]

Simples.zip:   0%|          | 0.00/223M [00:00<?, ?iB/s]

Socios0.zip:   0%|          | 0.00/147M [00:00<?, ?iB/s]

Socios1.zip:   0%|          | 0.00/47.1M [00:00<?, ?iB/s]

Socios2.zip:   0%|          | 0.00/46.8M [00:00<?, ?iB/s]

Socios3.zip:   0%|          | 0.00/46.9M [00:00<?, ?iB/s]

Socios4.zip:   0%|          | 0.00/46.9M [00:00<?, ?iB/s]

Socios5.zip:   0%|          | 0.00/46.8M [00:00<?, ?iB/s]

Socios6.zip:   0%|          | 0.00/47.1M [00:00<?, ?iB/s]

Socios7.zip:   0%|          | 0.00/47.0M [00:00<?, ?iB/s]

Socios8.zip:   0%|          | 0.00/46.8M [00:00<?, ?iB/s]

Socios9.zip:   0%|          | 0.00/47.0M [00:00<?, ?iB/s]

In [17]:
# Checa se deu erro e tenta novamente
erros = [u[1] for u in filter(lambda x: x[0]!='200', res)]
print(erros)
#res = res if len(erros) == 0 else baixar(erros)

[]


# Upload

## Funções

In [7]:
dirlist= os.listdir(dirpath)
engine = sa.create_engine(uri_sqlite)

sql_exec = lambda q : engine.connect().execute(sa.text(q)).close()

In [8]:
def upload(local, table, dcol, query=""):
    colunas=list(dcol.keys())
    
    sql_exec(
        "DROP TABLE IF EXISTS \"" + table + "\";"
        )
    
    if query != "":
        sql_exec(query)
        
    block='default' if local.find('.zip') < 0 else None
    df = dd.read_csv(local, header=None, encoding='latin1', sep=";",
                    decimal=",", names=colunas, dtype=dcol, blocksize=block)
    
    for n in trange(df.npartitions, desc=table):
        df.get_partition(n).compute().to_sql( name=table, con=engine,
                        if_exists= 'append', chunksize=1000, index=False)
    
    # sql_exec(
    #     "CREATE INDEX IF NOT EXISTS idx_" + table + " ON \"" + table + "\" (\"" + df.columns[0] + "\")"
    #     )

In [9]:
def extract(flist,filtro):
    filenames=[]
    files=list(filter(lambda x: not x.find(filtro), dirlist))
    
    for filename in files:
            zip=zipfile.ZipFile(dirpath+filename)
            filenames.append(dirpath+zip.filelist[0].filename)
            zip.extractall(path=dirpath)
    
    return filenames

def remove(flist):
    for f in flist:
        os.remove(f)

## Começo do Upload

In [29]:
local=dirpath+'Municipios.zip'
dcol = {"CD_MUNICIPIO": "int","MUNICIPIO":"str"}
table='MUNICIPIO'
query = """
CREATE TABLE "MUNICIPIO" (
	"CD_MUNICIPIO" INTEGER PRIMARY KEY, 
	"MUNICIPIO" TEXT NOT NULL
);
"""
upload(local, table, dcol, query)

MUNICIPIO:   0%|          | 0/1 [00:00<?, ?it/s]

In [30]:
local=dirpath+'Cnaes.zip'
dcol = {"CNAE": "int","NM_CNAE":"str"}
table='CNAE'
query = """
CREATE TABLE "CNAE" (
	"CNAE" INTEGER PRIMARY KEY, 
	"NM_CNAE" TEXT NOT NULL
);
"""
upload(local, table, dcol,query)

CNAE:   0%|          | 0/1 [00:00<?, ?it/s]

In [31]:
local=dirpath+'Paises.zip'
dcol = {"CD_PAIS": "int","PAIS":"str"}
table='PAIS'
query = """
CREATE TABLE "PAIS" (
	"CD_PAIS" INTEGER PRIMARY KEY,
	"PAIS" TEXT NOT NULL
);
"""
upload(local, table, dcol, query)

PAIS:   0%|          | 0/1 [00:00<?, ?it/s]

In [36]:
local=dirpath+'Naturezas.zip'
dcol={"CD_NAT_JURIDICA": "int","NAT_JURIDICA":"str"}
table='NATJU'
query = """
CREATE TABLE "NATJU" (
	"CD_NAT_JURIDICA" INTEGER PRIMARY KEY,
	"NAT_JURIDICA" TEXT NOT NULL
);
"""
upload(local, table, dcol, query)

NATJU:   0%|          | 0/1 [00:00<?, ?it/s]

In [37]:
local=dirpath+'Motivos.zip'
dcol={"CD_MOTIVO_SIT_CADASTRO":"int","MOTIVO_SIT_CADASTRO":"str"}
table='MOTIVOSIT'
query="""
CREATE TABLE "MOTIVOSIT" (
	"CD_MOTIVO_SIT_CADASTRO" INTEGER PRIMARY KEY, 
	"MOTIVO_SIT_CADASTRO" TEXT NOT NULL
);
"""
upload(local, table, dcol, query)

MOTIVOSIT:   0%|          | 0/1 [00:00<?, ?it/s]

In [38]:
local=dirpath+'Qualificacoes.zip'
dcol={"CD_QUALS":"int","NM_QUALS":"str"}
table='QUALS'
query = """
CREATE TABLE "QUALS" (
	"CD_QUALS" INTEGER PRIMARY KEY, 
	"NM_QUALS" TEXT NOT NULL
);
"""
upload(local, table, dcol, query)

QUALS:   0%|          | 0/1 [00:00<?, ?it/s]

parei aqui em 10/09/2024

In [12]:
lista= extract(dirlist,"Empre")

local=dirpath+'*.EMPRECSV'
dcol = {
    "CNPJ_BASICO":"int",
    "RAZAO_SOCIAL":"str",
    "NAT_JURIDICA":"int",
    "QUAL_RESP":"int",
    "CAPITAL_SOCIAL":"float",
    "PORTE_EMPRESA":"Int64",
    "ENTE_FED_RESP":"str"
    }
table='EMPRESA'
query="""
CREATE TABLE "EMPRESA" (
	"CNPJ_BASICO" INTEGER PRIMARY KEY, 
	"RAZAO_SOCIAL" TEXT NOT NULL ON CONFLICT IGNORE, 
	"NAT_JURIDICA" INTEGER, 
	"QUAL_RESP" INTEGER, 
	"CAPITAL_SOCIAL" FLOAT, 
	"PORTE_EMPRESA" INT NULL, 
	"ENTE_FED_RESP" TEXT,
	FOREIGN KEY ("NAT_JURIDICA") REFERENCES "NATJU"("CD_NAT_JURIDICA"),
	FOREIGN KEY ("QUAL_RESP") REFERENCES "QUALS"("CD_QUALS")
);
"""
upload(local, table, dcol, query)

remove(lista)

EMPRESA:   0%|          | 0/67 [00:00<?, ?it/s]

In [14]:
lista= extract(dirlist,"Simples")

local=dirpath+'*.SIMPLES.*'
dcol={
    "CNPJ_BASICO":"int",
    "OP_SIMPLES":"str",
    "DT_OP_SIMPLES":"int",
    "DT_EXC_SIMPLES":"int",
    "OP_MEI":"str",
    "DT_OP_MEI":"int",
    "DT_EXC_MEI":"int"
    }
table='SIMPLES'
query = """
CREATE TABLE "SIMPLES" (
	"CNPJ_BASICO" INTEGER PRIMARY KEY, 
	"OP_SIMPLES" TEXT, 
	"DT_OP_SIMPLES" INTEGER, 
	"DT_EXC_SIMPLES" INTEGER, 
	"OP_MEI" TEXT, 
	"DT_OP_MEI" INTEGER, 
	"DT_EXC_MEI" INTEGER,
 	FOREIGN KEY ("CNPJ_BASICO") REFERENCES "EMPRESA"("CNPJ_BASICO")
);
"""
upload(local, table, dcol, query)

remove(lista)

SIMPLES:   0%|          | 0/39 [00:00<?, ?it/s]

In [15]:
lista= extract(dirlist,"Estabelecimentos")

local=dirpath+'*.ESTABELE'
dcol={
    "CNPJ_BASICO":"int",
    "CNPJ_ORDEM":"int",
    "CNPJ_DV":"int",
    "CD_MATRIZ_FILIAL":"int",
    "NM_FANTASIA":"str",
    "CD_SIT_CADASTRO":"int",
    "DT_SIT_CADASTRO":"int",
    "MOTIVO_SIT_CADASTRO":"int",
    "NM_CIDADE_EXT":"str",
    "CD_PAIS":"Int64",
    "DT_INI":"int",
    "CNAE_PRINCIPAL":"int",
    "CNAE_SECUNDARIO":"str",
    "TIP_LOGRADOURO":"str",
    "LOGRADOURO":"str",
    "NUMERO":"str",
    "COMPLEMENTO":"str",
    "BAIRRO":"str",
    "CEP":"str",
    "UF":"str",
    "MUNICIPIO":"int",
    "DDD_1":"str",
    "TEL_1":"str",
    "DDD_2":"str",
    "TEL_2":"str",
    "DDD_FAX":"str",
    "FAX":"str",
    "EMAIL":"str",
    "SIT_ESP":"str",
    "DT_SIT_ESP":"Int64"}
table='ESTABELE'
query = """
CREATE TABLE "ESTABELE" (
	"CNPJ_BASICO" INTEGER, 
	"CNPJ_ORDEM" INTEGER, 
	"CNPJ_DV" INTEGER, 
	"CD_MATRIZ_FILIAL" INTEGER, 
	"NM_FANTASIA" TEXT, 
	"CD_SIT_CADASTRO" INTEGER, 
	"DT_SIT_CADASTRO" INTEGER, 
	"MOTIVO_SIT_CADASTRO" INTEGER, 
	"NM_CIDADE_EXT" TEXT, 
	"CD_PAIS" INT NULL, 
	"DT_INI" INTEGER, 
	"CNAE_PRINCIPAL" TEXT, 
	"CNAE_SECUNDARIO" TEXT, 
	"TIP_LOGRADOURO" TEXT, 
	"LOGRADOURO" TEXT, 
	"NUMERO" TEXT, 
	"COMPLEMENTO" TEXT, 
	"BAIRRO" TEXT, 
	"CEP" TEXT, 
	"UF" TEXT, 
	"MUNICIPIO" INTEGER, 
	"DDD_1" TEXT, 
	"TEL_1" TEXT, 
	"DDD_2" TEXT, 
	"TEL_2" TEXT, 
	"DDD_FAX" TEXT, 
	"FAX" TEXT, 
	"EMAIL" TEXT, 
	"SIT_ESP" TEXT, 
	"DT_SIT_ESP" INT NULL,
	PRIMARY KEY ("CNPJ_BASICO", "CNPJ_ORDEM"),
	FOREIGN KEY ("CNPJ_BASICO") REFERENCES "EMPRESA"("CNPJ_BASICO"),
	FOREIGN KEY ("MOTIVO_SIT_CADASTRO") REFERENCES "MOTIVOSIT"("CD_MOTIVOSIT"),
	FOREIGN KEY ("CD_PAIS") REFERENCES "PAIS"("CD_PAIS"),
	FOREIGN KEY ("CNAE_PRINCIPAL") REFERENCES "CNAE"("CNAE")
	FOREIGN KEY ("MUNICIPIO") REFERENCES "MUNICIPIO"("CD_MUNICIPIO")
);
"""
upload(local, table, dcol)

remove(lista)

ESTABELE:   0%|          | 0/216 [00:00<?, ?it/s]

In [16]:
lista= extract(dirlist,"Socios")

local=dirpath+'*.SOCIOCSV'
dcol={
    "CNPJ_BASICO":"int",
    "ID_TIPO_SOCIO":"int",
    "NOME_OU_RAZAO_SOCIAL":"str",
    "CNPJ_CPF":"str",
    "QUALIF_SOCIO":"int",
    "DT_ENTRADA":"int",
    "CD_PAIS":"Int64",
    "REPR_LEGAL":"str",
    "NM_REPR":"str",
    "CD_QUALIF_REPR":"int",
    "FAIXA_ETARIA":"int"
    }
table='SOCIO'
query = """
CREATE TABLE "SOCIO" (
	"CNPJ_BASICO" INTEGER, 
	"ID_TIPO_SOCIO" INTEGER, 
	"NOME_OU_RAZAO_SOCIAL" TEXT, 
	"CNPJ_CPF" TEXT, 
	"QUALIF_SOCIO" INTEGER, 
	"DT_ENTRADA" INTEGER, 
	"CD_PAIS" INTEGER NULL, 
	"REPR_LEGAL" TEXT, 
	"NM_REPR" TEXT, 
	"CD_QUALIF_REPR" INTEGER, 
	"FAIXA_ETARIA" INTEGER,
	FOREIGN KEY ("CNPJ_BASICO") REFERENCES "EMPRESA"("CNPJ_BASICO"),
	FOREIGN KEY ("QUALIF_SOCIO") REFERENCES "QUALS"("CD_QUALS"),
	FOREIGN KEY ("CD_QUALIF_REPR") REFERENCES "QUALS"("CD_QUALS"),
	FOREIGN KEY ("CD_PAIS") REFERENCES "PAIS"("CD_PAIS")
);
"""
upload(local, table, dcol)

remove(lista)



SOCIO:   0%|          | 0/37 [00:00<?, ?it/s]

In [17]:
# Indices para a tabela Socios
sql_exec(""" CREATE INDEX idx_SOCIO ON "SOCIO" ("CNPJ_BASICO"); """)
sql_exec(""" CREATE INDEX idx_SOCIO_NOME_CPF on SOCIO (NOME_OU_RAZAO_SOCIAL,CNPJ_CPF); """)

In [18]:
ziplist = filter( lambda x: (x.find(".zip") > 0), dirlist)
ziplist = [ (dirpath + f) for f in ziplist ]
"Cancelado" if input("Tem certeza? S/N").upper() != "S" else ( remove(ziplist) )

# Criar Views

In [19]:
# Criar Views SOCIO
query = '''
CREATE VIEW VW_SOCIO as
SELECT 
	s.CNPJ_BASICO,
	e.RAZAO_SOCIAL,
	(CASE ID_TIPO_SOCIO
		WHEN 1 THEN 'PJ'
		WHEN 2 THEN 'PF'
		WHEN 3 THEN 'EX'
	END) AS TIPO_SOCIO,
	s.NOME_OU_RAZAO_SOCIAL,
	s.CNPJ_CPF,
	q.NM_QUALS AS NM_QUALIF_SOCIO,
	s.DT_ENTRADA,
	p.PAIS,
	s.REPR_LEGAL AS CPF_REPR_LEGAL,
	s.NM_REPR,
	qs.NM_QUALS  AS NM_QUALIF_REPR,
	s.FAIXA_ETARIA 
FROM SOCIO s 
LEFT JOIN EMPRESA e 
	ON s.CNPJ_BASICO = e.CNPJ_BASICO
LEFT JOIN QUALS q 
	ON s.QUALIF_SOCIO = q.CD_QUALS
LEFT JOIN QUALS qs 
	ON s.QUALIF_SOCIO = qs.CD_QUALS
LEFT JOIN PAIS p 
	ON s.CD_PAIS = p.CD_PAIS ;
'''
sql_exec(query)

In [20]:
# Criar Views EMPRESA
query = '''
CREATE view VW_EMPRESA as
SELECT 
	e.CNPJ_BASICO,
	e.RAZAO_SOCIAL,
	n.NAT_JURIDICA,
	q.NM_QUALS AS NM_QUAL_RESP,
	e.CAPITAL_SOCIAL,
	(CASE e.PORTE_EMPRESA
		WHEN 0 THEN 'N/A'
		WHEN 1 THEN 'ME'
		WHEN 3 THEN 'EPP'
		WHEN 5 THEN 'DEMAIS'
	end) AS PORTE_EMPRESA ,
	e.ENTE_FED_RESP 
FROM EMPRESA e 
LEFT JOIN NATJU n 
	ON e.NAT_JURIDICA = n.CD_NAT_JURIDICA
LEFT JOIN QUALS q 
	ON e.QUAL_RESP = q.CD_QUALS;
'''
sql_exec(query)

In [21]:
# Criar Views ESTABELE
query = '''
CREATE view VW_ESTABELE AS
SELECT
	e.CNPJ_BASICO,
	e.CNPJ_ORDEM,
	e.CNPJ_DV,
	(CASE e.CD_MATRIZ_FILIAL
		WHEN 1 THEN 'MATRIZ'
		WHEN 2 THEN 'FILIAL'
	END) AS MATRIZ_FILIAL , 
	e.NM_FANTASIA,
	(CASE e.CD_SIT_CADASTRO
		WHEN 1 THEN 'NULA'
		WHEN 2 THEN 'ATIVA'
		WHEN 3 THEN 'SUSPENSA'
		WHEN 4 THEN 'INAPTA'
		WHEN 8 THEN 'BAIXADA'
	end) AS SIT_CADASTRO,
	e.DT_SIT_CADASTRO,
	m.MOTIVO_SIT_CADASTRO AS MOTIVO_SIT_CADASTRO,
	e.NM_CIDADE_EXT,
	p.PAIS,
	e.DT_INI,
	c.NM_CNAE, 
	e.CNAE_PRINCIPAL,
	e.CNAE_SECUNDARIO,
	e.TIP_LOGRADOURO,
	e.LOGRADOURO,
	e.NUMERO,
	e.COMPLEMENTO,
	e.BAIRRO,
	e.CEP,
	e.UF,
	m2.MUNICIPIO,
	e.DDD_1,
	e.TEL_1,
	e.DDD_2,
	e.TEL_2,
	e.DDD_FAX,
	e.FAX,
	e.EMAIL,
	e.SIT_ESP,
	e.DT_SIT_ESP 
FROM ESTABELE e 
LEFT JOIN MOTIVOSIT m 
	ON e.MOTIVO_SIT_CADASTRO = m.CD_MOTIVO_SIT_CADASTRO 
LEFT JOIN PAIS p 
	ON e.CD_PAIS = p.CD_PAIS 
LEFT JOIN CNAE c 
	ON e.CNAE_PRINCIPAL = c.CNAE 
LEFT JOIN MUNICIPIO m2 
	ON e.MUNICIPIO = m2.CD_MUNICIPIO ;
'''
sql_exec(query)

In [22]:
# Criar uma view para grafo societario
query = '''
CREATE view if not exists grapho as
SELECT
	SUBSTR("0000000" || CNPJ_BASICO,-8) as A,
	(case
		when CNPJ_CPF LIKE '***%' then (COALESCE(NOME_OU_RAZAO_SOCIAL,"") || " - " || CNPJ_CPF)
		when CNPJ_CPF is null then NOME_OU_RAZAO_SOCIAL
		else SUBSTR(CNPJ_CPF, 1, 8) end) as B
FROM SOCIO
union all
SELECT
	SUBSTR("0000000" || CNPJ_BASICO,-8) as A,
	(case
		when REPR_LEGAL LIKE '***%' then (NM_REPR || " - " || REPR_LEGAL)
		else NM_REPR end) as B
FROM SOCIO
WHERE NM_REPR is not null;
'''
sql_exec(query)

In [23]:
sql_exec("vacuum;")

## Testes