In [2]:
import grequests
from random import randint, sample
from lxml.html import fromstring

### Algorithm for generating random CNPJs

In [3]:
def validate_digit(mult_array, digits_array):
    sum_digits = sum(map(lambda x,y: x*y, mult_array, digits_array))
    dig = sum_digits % 11
    
    if dig < 2:
        dig = 0
    else:
        dig = 11 - dig

    return dig


def generate_cnpj():
    cnpj_12d = str(randint(1000,1e8)).zfill(8) + '0001'
    cnpj_int_l = [int(d) for d in list(cnpj_12d)]

    first_dig = validate_digit([5,4,3,2,9,8,7,6,5,4,3,2], cnpj_int_l)
    sec_dig = validate_digit([6,5,4,3,2,9,8,7,6,5,4,3,2], cnpj_int_l + [first_dig] )
    return cnpj_12d + str(first_dig) + str(sec_dig)


In [4]:
#Write each random CNPJ to a text file. These file will be used to fetch data from the receitaWS API
def write_to_txt(n, list_cnpjs):
    with open('list_cnpjs.txt', 'w') as f:
        for cnpj in list_cnpjs:
            f.write(f'{cnpj}\n')

            
list_cnpjs = [generate_cnpj() for i in range(10000)]
write_to_txt(10000, list_cnpjs)

In [5]:
def read_txt():
    with open('list_cnpjs.txt', 'r') as f:
        lines = f.readlines()
        return [x.strip() for x in lines]
    
list_cnpjs = read_txt()

In [6]:
len(list_cnpjs)

10000

### Free Proxies

In [27]:
import requests, json
from time import time

In [8]:
def fetch_proxies():
    response = requests.get('https://free-proxy-list.net/')
    parser = fromstring(response.text)
    proxies = set()
    for i in parser.xpath('//tbody/tr')[:300]: # this site provides up to 300 free proxies
        # We do not want transparent proxies, since they do not work to bypass APIs
        if i.xpath('.//td[5][not(contains(text(),"transparent"))]'):
            # IP + Port
            proxy = ":".join([i.xpath('.//td[1]/text()')[0], i.xpath('.//td[2]/text()')[0]])
            proxies.add(proxy)
    return proxies

In [9]:
list_proxies = list(fetch_proxies())
len(list_proxies)

279

In [31]:
def exception_handler(request, exception):
    print("Request failed", exception)

def generate_requests(cnpjs, proxies):
    ## Create the list for parallel connections through free proxies
    return [grequests.get(f'https://www.receitaws.com.br/v1/cnpj/{cnpj}', 
                          proxies = {
                            "http": f'http://{proxy}', 
                            "https": f'http://{proxy}' 
                          },
                          timeout=10) 
            for cnpj, proxy in zip(cnpjs, proxies)]


In [32]:
def select_proxies(st, step, list_proxies):
    
    end = (st + step) % len(list_proxies)

    if st > end:
        return list_proxies[st:] + list_proxies[0:end] 
    else:
        return list_proxies[st:end]
    

In [33]:
def save_json(r, cnpj):
    data = json.dumps(r.content.decode("utf-8"))
    with open(f'./jsons/{cnpj}.json', 'w') as f:
        json.dump(data, f)

def check_response(resp, chosen_cnpjs, list_cnpjs):
    
    for r, cnpj in zip(resp, chosen_cnpjs):
        if r:
            if r.status_code == 200:
                save_json(r, cnpj)
                list_cnpjs.remove(cnpj)
        

In [None]:
proxies = list_proxies[:100]
cnpjs = list_cnpjs[:50]

step = 20
st = 0
count = 0

while len(cnpjs) >= step:
    
    if count % 3 == 0:
        proxies = list(fetch_proxies())
        
    start_time = time()
    chosen_cnpjs = sample(cnpjs, step)
    
    chosen_proxies = select_proxies(st, step, proxies)
    st = (st + step) % len(list_proxies)
    
    reqs = generate_requests(chosen_cnpjs, chosen_proxies)
    resp = grequests.map(reqs)
    
    check_response(resp, chosen_cnpjs, cnpjs)

    count += 1
    print(len(cnpjs), time()-start_time, count)
