In [3]:
import pandas as pd
import numpy as np

Write a python code that can get all the rarity ranks of the tokens in these collections. You can use Opensea API, RaritySniper API, Looksrare platform, or any other platform/ source/ API to get these.

#### load data

In [100]:
data = pd.read_csv("data/Collections_Assignment.csv")

load opensea API key

In [None]:
import yaml

with open('config/config.yaml') as f:
    config = yaml.safe_load(f)

API_KEY = config['API_KEY']



### fetching collection stats using opensea API and collection slug - unique to a NFT Collection

In [102]:
import requests
import requests
collection_slug = "robofrens"

url = f"https://api.opensea.io/api/v2/collections/{collection_slug}/stats"

headers = {"accept": "application/json", "X-API-KEY": API_KEY}

response = requests.get(url, headers=headers)

resp = response.json()


In [105]:
data.columns = ['contract_address', 'collection_slug']

In [106]:
collection_slugs = list(data['collection_slug'].drop_duplicates().to_numpy())
len(collection_slugs)

739

In [None]:
import requests

url = "https://api.opensea.io/api/v2/traits/collection_slug"

headers = {"accept": "application/json"}

response = requests.get(url, headers=headers)

print(response.text)

In [31]:
import requests
import ast
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

collection_stats = []

def fetch_collection_stats(cs):
    url = f"https://api.opensea.io/api/v2/collections/{cs}/stats"
    headers = {"accept": "application/json", "X-API-KEY": API_KEY}

    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        return ast.literal_eval(response.text)
    else:
        return None

# Use ThreadPoolExecutor to make parallel API calls
with ThreadPoolExecutor(max_workers=5) as executor:  # You can adjust max_workers as needed
    futures = [executor.submit(fetch_collection_stats, cs) for cs in collection_slugs]

    # tqdm to display progress bar
    for future in tqdm(as_completed(futures), total=len(futures), desc="Fetching Collection Stats"):
        result = future.result()
        collection_stats.append(result)
        time.sleep(1)  # Add a delay between requests to avoid rate limiting




Fetching Collection Stats: 100%|██████████| 739/739 [28:34<00:00,  2.32s/it]    


In [32]:
len(collection_stats)

739

In [37]:
collection_stats_df = pd.DataFrame({'collection_slug': collection_slugs, 'collection_stats': collection_stats})

In [38]:
collection_stats_df.shape

(739, 2)

In [60]:
collection_stats_df.shape

(739, 2)

In [61]:
collection_stats_df.to_pickle("data/collection_stats_df.pkl")

In [39]:
data_collection_stats = data.merge(collection_stats_df, on = 'collection_slug')
data_collection_stats.shape

(863, 3)

In [118]:
data_collection_stats.columns = ['contract_address', 'collection_slug', 'collection_stats']

In [122]:
data_collection_stats = data_collection_stats[['collection_slug', 'contract_address', 'collection_stats']]
data_collection_stats.to_pickle("data/data_collection_stats.pkl")

#### below - details on nfts corresponding to a collection can be found, however max no of nfts that can be fetched from a collection is 200, so to fetch the next set of 200 nfts we use the next address

In [123]:
import requests

url = "https://api.opensea.io/api/v2/collection/robofrens/nfts?limit=200"

payload = {}
headers = {
  'accept': 'application/json',
  'X-API-KEY': API_KEY,
}

response = requests.request("GET", url, headers=headers, data=payload)

resp = response.json()


In [124]:
len(resp['nfts'])

200

fetching the next 200 nfts using next cursor value

In [126]:
import requests

url = "https://api.opensea.io/api/v2/collection/robofrens/nfts?limit=200&next=LXBrPTMwNzE5NzIxMg=="

payload = {}
headers = {
  'accept': 'application/json',
  'X-API-KEY': API_KEY
}

response = requests.request("GET", url, headers=headers, data=payload)

resp2 = response.json()


In [127]:
resp['nfts'][0]['name']

'robo fren #776'

In [128]:
resp2['nfts'][0]['name']

'robo fren #576'

as there is no direct way to get the total no of nfts within a collection, without iterating through next parameter, until its complete,  using opensea, 

I will be using raritysniper that directly gives the totalsupply or nft count corresponding to a collection

Here, the 'totalSupply' key for the API response - gives the total no of NFTs, and directly gives the associated rarity ranking based on the trait scores using its algorithm

In [139]:
import requests

url = "https://api.raritysniper.com/public/collection/bearsontheblock"

payload = {}
headers = {
  'Referer': 'https://raritysniper.com/'
}

response = requests.request("GET", url, headers=headers, data=payload)

resp = response.json()

collection_size = resp['totalSupply']

In [5]:
payload = {}
headers = {
  'Referer': 'https://raritysniper.com/'
}

def size_of_collection(collection_slug):
    url = f"https://api.raritysniper.com/public/collection/{collection_slug}"


    response = requests.request("GET", url, headers=headers, data=payload)

    resp = response.json()
    
    try:

        collection_size = resp['totalSupply']
    except Exception:
        collection_size = 0

    return collection_size


In [137]:
import requests

url = "https://api.raritysniper.com/public/collection/bearsontheblock/id/0"

payload = {}
headers = {
  'authority': 'api.raritysniper.com',
  'accept': '*/*',
  'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8',
  'origin': 'https://raritysniper.com',
  'referer': 'https://raritysniper.com/'
}

response = requests.request("GET", url, headers=headers, data=payload)

resp = response.json()


based on the totalSuppy aka total nft count - we can fetch nft wise ranks and traits - i.e ranging between 0 to totalSupply - 1,

below returns trait values and rarity rank computer by rarity sniper's algorithm based on NFT's traits

In [141]:
collection_slug = "bearsontheblock"

In [142]:
collection_size

9495

based on robots.txt of raritysniper - we can infer that it allows crawling of all parts of the website

User-agent: *

Allow: /

Sitemap: https://raritysniper.com/sitemap

Sitemap: https://raritysniper.com/news/sitemap.xml

Sitemap: https://raritysniper.com/news/sitemap-news.xml

In [146]:
import logging
from tqdm import tqdm

In [148]:
nfts_list = []
nfts_df = pd.DataFrame()

In [None]:
url = f"https://api.raritysniper.com/public/collection/cryptopunks/id/0"

In [19]:
import pandas as pd
import logging
import requests
from tqdm import tqdm

data = pd.read_pickle("data/data_collection_stats.pkl")

data.head()

Unnamed: 0,collection_slug,contract_address,collection_stats
0,gotcha-gatcha,0x00dca92b7fbd0c01f3508756718807836ec82156,
1,rare-ghost-club,0x00fa82ea9be4e24ec6d7ed86ef93bfe85b9a3e68,"{'total': {'volume': 0.18, 'sales': 2, 'averag..."
2,robofrens,0x01f61f3c7f27893b30e8abdafd4a84ca8bd24b96,"{'total': {'volume': 848.3959432001659, 'sales..."
3,bearsontheblock,0x02aa731631c6d7f8241d74f906f5b51724ab98f8,"{'total': {'volume': 11.23, 'sales': 25, 'aver..."
4,antvasion,0x03315ec191c834cbcf88068e24408033f0e3bf4a,"{'total': {'volume': 26.24196, 'sales': 526, '..."


In [20]:
collection_slugs = list(data['collection_slug'].drop_duplicates().to_numpy())

In [21]:
len(collection_slugs)

739

In [26]:
urls = []
for collection_slug in tqdm(collection_slugs):
    collection_size = size_of_collection(collection_slug)
    logging.info(f"collecting NFT traits for {collection_slug=},  {collection_size=}")
    
    try:
        for i in range(collection_size):
            
            url = f"https://api.raritysniper.com/public/collection/{collection_slug}/id/{i}"
            urls.append((i, collection_slug, url))
    except Exception:
        pass

100%|██████████| 739/739 [19:40<00:00,  1.60s/it]  


In [29]:
url_df = pd.DataFrame(urls)
url_df.to_pickle("data/url_df.pkl")

In [30]:
url_df.shape

(4152150, 3)

In [34]:
4152150/32

129754.6875

In [35]:
urls[0]

(0,
 'gotcha-gatcha',
 'https://api.raritysniper.com/public/collection/gotcha-gatcha/id/0')

In [44]:
import asyncio
import aiohttp
import multiprocessing
from tqdm import tqdm
import pandas as pd
import requests

url_df.to_pickle("data/url_df.pkl")

count = 0

payload = {}
headers = {
        'authority': 'api.raritysniper.com',
        'accept': 'application/json',
        'origin': 'https://raritysniper.com'
        }

nfts_list = []

async def fetch_data(session, url, collection_slug, i, progress_bar):
    global count  # Add this line to declare count as a global variable
    response = await session.get(url, headers=headers, data=payload)
    resp = await response.json()
    subset_dict = {key: resp[key] for key in ['rank', 'rarityScore', 'image', 'traits']}
    subset_dict['collection_slug'] = collection_slug
    subset_dict['nft_id'] = i
    nfts_list.append(subset_dict)

    global count  # Declare count as a global variable
    count += 1
    
    progress_bar.update(1)  # Update the progress bar
   
    if count % 100 == 0:
        nfts_df = pd.DataFrame(nfts_list)
        nfts_df.to_pickle("data/nfts_df.pkl")



max_concurrent_requests = 32



async def bound_fetch(sem, func):
    # Wrapper function to limit the number of concurrent calls
    async with sem:
        return await func

async def main():
    async with aiohttp.ClientSession() as session:
        # Limit the number of concurrent requests using asyncio.semaphore
        with tqdm(total=len(urls)) as progress_bar:
            tasks = [fetch_data(session, url, cs, i, progress_bar) for i, cs, url in urls]
            semaphore = asyncio.Semaphore(max_concurrent_requests)
            tasks = [bound_fetch(semaphore, fetch_data(session, url, collection_slug, i, progress_bar)) for i, collection_slug, url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            print(f"{len(results)=}")
            pd.DataFrame(results).to_pickle("data/results.pkl")


In [42]:
import multiprocessing
multiprocessing.cpu_count() * 4

32

In [None]:
import nest_asyncio
import asyncio
nest_asyncio.apply()

asyncio.run(main())

In [157]:
data.shape

(863, 2)

In [159]:
len(collection_slugs)

739