# Download

In [4]:
import requests
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.notebook import tqdm, trange
from bs4 import BeautifulSoup as bs
import os
import sqlalchemy as sa
import zipfile
import dask.dataframe as dd

In [5]:
# configuacoes
urlbase = "https://dadosabertos.rfb.gov.br/CNPJ/"
uri_sqlite ='sqlite:///base_receita20230315.db' #'sqlite:///base_receitaYYYYMMDD.db'
dirpath= "downloads/" # vai criar a pasta se ela não existir
if not os.path.exists(dirpath):
    os.makedirs(dirpath)

In [6]:
def mapp(fn,data, workers=8):
  tasks=[]
  out = []
  with ThreadPoolExecutor(max_workers=workers) as tex:
    tasks=[tex.submit(fn,i) for i in data]
  for task in as_completed(tasks):
    out.append(task.result())
  return out

In [7]:
def downloader(url):
    try:
        r = requests.get(url, stream=True)
        total = int(r.headers.get('content-length', 0))
        if "Content-Disposition" in r.headers.keys():
            fname = re.findall("filename=(.+)", r.headers["Content-Disposition"])[0]
        else:
            fname = url.split("/")[-1]
        with open(dirpath+fname, 'wb') as file, tqdm(desc=fname, total=total, unit='iB', unit_scale=True, unit_divisor=1024) as bar:
            for data in r.iter_content(chunk_size=1024):
                size = file.write(data)
                bar.update(size)
    except Exception as inst:
        print(inst)          # __str__ allows args to be printed directly,    
    finally:
        return [str(r.status_code), url]

In [8]:
filelist = list( filter( lambda s: s.find(".")>0,
                        [tag.attrs['href'] for tag in bs(requests.get(urlbase).content).find_all('a')]))

In [9]:
urllist = [urlbase+f for f in filelist]
urllist = list(filter(lambda s:s.find("Socio")>0 ,urllist))

In [7]:
res = mapp(downloader,urllist) if input("Tem certeza? S/N").upper() == "S" else "Cancelado"
# se der erro em algum basta usar o downloader
# downloader(urllist[0])

Socios3.zip:   0%|          | 0.00/46.8M [00:00<?, ?iB/s]

Socios6.zip:   0%|          | 0.00/46.8M [00:00<?, ?iB/s]

Socios2.zip:   0%|          | 0.00/46.9M [00:00<?, ?iB/s]

Socios4.zip:   0%|          | 0.00/46.9M [00:00<?, ?iB/s]

Socios1.zip:   0%|          | 0.00/47.0M [00:00<?, ?iB/s]

Socios0.zip:   0%|          | 0.00/102M [00:00<?, ?iB/s]

Socios5.zip:   0%|          | 0.00/46.9M [00:00<?, ?iB/s]

Socios7.zip:   0%|          | 0.00/46.8M [00:00<?, ?iB/s]

Socios8.zip:   0%|          | 0.00/46.8M [00:00<?, ?iB/s]

Socios9.zip:   0%|          | 0.00/46.8M [00:00<?, ?iB/s]

# Upload

## Configurações e Funções

In [10]:
dirlist= os.listdir(dirpath)
engine = sa.create_engine(uri_sqlite)
engine.connect().close() #test

In [11]:
def upload(local, table, dcol):
    colunas=list(dcol.keys())
    with engine.connect() as c:
        c.execute(sa.text("DROP TABLE IF EXISTS \"" + table + "\""))
        block='default' if local.find('.zip') < 0 else None
        df = dd.read_csv(local, header=None, encoding='latin1', sep=";", decimal=",", names=colunas, dtype=dcol, blocksize=block)
    for n in trange(df.npartitions, desc=table):
        df.get_partition(n).compute().to_sql( name=table, con=engine, if_exists= 'append', chunksize=1000, index=False)
    with engine.connect() as c:
         c.execute(sa.text("CREATE INDEX IF NOT EXISTS idx_" + table + " ON \"" + table + "\" (\"" + df.columns[0] + "\")"))

In [12]:
def extract(flist,filtro):
    filenames=[]
    files=list(filter(lambda x: not x.find(filtro), dirlist))
    for filename in files:
            zip=zipfile.ZipFile(dirpath+filename)
            filenames.append(dirpath+zip.filelist[0].filename)
            zip.extractall(path=dirpath)
    return filenames

def remove(flist):
    for f in flist:
            os.remove(f)

## Começo do Upload

In [31]:
lista= extract(dirlist,"Socios")
#socios
dcol={"CNPJ_BASICO":"int","ID_TIPO_SOCIO":"int","NOME_OU_RAZAO_SOCIAL":"str","CNPJ_CPF":"str","QUALIF_SOCIO":"int",
         "DT_ENTRADA":"int","CD_PAIS":"Int64","REPR_LEGAL":"str","NM_REPR":"str","CD_QUALIF_REPR":"int","FAIXA_ETARIA":"int"}
local=dirpath+'*.SOCIOCSV'
table='SOCIO'
upload(local, table, dcol)
remove(lista)

SOCIO:   0%|          | 0/33 [00:00<?, ?it/s]

In [22]:
if input("Tem certeza? S/N").upper() == "S":
    remove([dirpath+f for f in filter(lambda x: x.find(".zip") > 0,dirlist)])
else:
    print("Cancelado")

Cancelado


## Criando a View para o Grapho

In [14]:
# query = '''SELECT 
#     name
# FROM 
#     sqlite_schema
# WHERE 
#     type ='table' AND 
#     name NOT LIKE 'sqlite_%' AND
#     name='SOCIO';'''
# with engine.connect() as c:
#     res=c.execute(sa.text(query)).fetchall()

In [19]:
query = '''
CREATE view if not exists grapho as
SELECT
	SUBSTR("0000000" || CNPJ_BASICO,-8) as A,
	(case
		when CNPJ_CPF LIKE '***%' then (NOME_OU_RAZAO_SOCIAL || " - " || CNPJ_CPF)
		else SUBSTR(CNPJ_CPF, 1, 8) end) as B
FROM SOCIO
union all
SELECT
	SUBSTR("0000000" || CNPJ_BASICO,-8) as A,
	(case
		when CNPJ_CPF LIKE '***%' then (NM_REPR || " - " || REPR_LEGAL)
		else SUBSTR(CNPJ_CPF, 1, 8) end) as B
FROM SOCIO
WHERE NM_REPR is not null;
'''
engine.connect().execute(sa.text(query)).close()