In [None]:
!pip install aiohttp

In [None]:
import asyncio
import aiohttp
import requests
import time
import json
import os
from itertools import islice

In [None]:
def chunked_iterable(iterable, chunk_size):
    it = iter(iterable)
    while True:
        chunk = list(islice(it, chunk_size))
        if not chunk:
            break
        yield chunk

In [None]:
class timeit_context(object):
    def __init__(self):
        self.initial = None

    def __enter__(self):
        self.initial = time.time()

    def __exit__(self, type_arg, value, traceback):
        print('Total time elapsed {} sec'.format(time.time() - self.initial))

In [None]:
async def get_collection_attributes_async(base_url, nft_id_list):
    attributes = [] # to store NFT data
    max_workers = 250
    
    tcp_connection = aiohttp.TCPConnector(limit=max_workers)
    async with aiohttp.ClientSession(connector=tcp_connection) as session:
        for nft_id_chunk in chunked_iterable(nft_id_list, max_workers):
            results = await get_nft_attributes_of_chunk(base_url, nft_id_chunk, session)
            attributes.extend(results)
    
    await tcp_connection.close()
    return attributes

In [None]:
def get_collection_attributes_sync(base_url, nft_id_list):
    timeout = 10
    header = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) '
                            'AppleWebKit/537.11 (KHTML, like Gecko) '
                            'Chrome/23.0.1271.64 Safari/537.11',
              'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
              'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
              'Accept-Encoding': 'none',
              'Accept-Language': 'en-US,en;q=0.8',
              'Connection': 'keep-alive'}
    
    attributes = []
    for nft_id in nft_id_list:
        response = requests.get(url=f'{base_url}/{nft_id}', headers=header, timeout=timeout)
        attributes.append(json.loads(response.text))
    return response

In [None]:
async def get_nft_attributes_of_chunk(base_url, nft_id_list, session):
    tasks = []
    
    for nft_id in nft_id_list:
        if 'ipfs://' in base_url:
            task = asyncio.ensure_future(get_nft_attributes_from_infura(base_url, nft_id, session=session))
        else:
            task = asyncio.ensure_future(get_nft_attributes(base_url, nft_id, session=session))
        tasks.append(task)
    
    return await asyncio.gather(*tasks, return_exceptions=True)

In [None]:
async def get_nft_attributes(base_url, nft_id, session):
    """ I/O Bound HTTP Get method for single NFT """
    timeout = 10
    nft_url = f"{base_url}/{nft_id}"
    try:
        async with session.get(nft_url, timeout=timeout) as response:
            result = await response.text()
            return json.loads(result)
    except Exception as e:
        print(
            f"Fetcha could not get the data for NFT. Error message: {e}. URL : {nft_url}")
        return {}

In [19]:
async def get_nft_attributes_from_infura(base_url, nft_id, session):
    """
    IO-bound call for requesting NFT data with HTTP Post method through Infura gateway.
    """
    timeout = 20
    
    project_id = os.getenv('INFURA_PROJECT_ID')
    project_secret = os.getenv('INFURA_PROJECT_SECRET')
    
    collection_id = base_url.split('ipfs://')[1]
    nft_url = f'https://ipfs.infura.io:5001/api/v0/cat?arg={collection_id}/{nft_id}'

    try:
        async with session.post(nft_url, auth=aiohttp.BasicAuth(project_id, project_secret),
                                timeout=timeout) as response:
            result = await response.text()
            json_result =  json.loads(result)
            json_result['name'] = json_result.get('name', nft_id)
            return json_result
    except Exception as e:
        print(
            f"Fetcha could not get the data from INFURA with POST for NFT. Error message: {e}. URL : {nft_url}")
        return {}

In [21]:
parameters = {
    'bayc' : { 
        'base_url' : 'ipfs://QmeSjSinHpPnmXmspMjwiXyN6zS4E9zccariGR3jxcaWtq',
        'id_start' : 0,
        'id_end'   : 10000,
    },
    'moonbirds': {
        'base_url' : 'https://live---metadata-5covpqijaa-uc.a.run.app/metadata/',
        'id_start' : 0,
        'id_end': 10000
    }
}

async def store_nft_attributes(nft_collection):
    params = parameters[nft_collection]
    base_url = params['base_url']
    id_start, id_end = params['id_start'], params['id_end']
    with timeit_context():
        attributes = await get_collection_attributes_async(base_url, range(id_start, id_end))
        
    with open(f'{nft_collection}.json', 'w') as fp:
        json.dump(attributes, fp)
        print('Done writing JSON data into .json file')


await store_nft_attributes('bayc')

Total time elapsed 47.09220099449158 sec
Done writing JSON data into .json file


In [None]:
with timeit_context():
    attributes = get_collection_attributes_sync(token_base_url, range(nft_id_start, nft_id_end))
