In [1]:
import os
import sys
import time
import signal
import requests

In [2]:
# Constants
# Root OpenSea API URL
API_URL = "https://api.opensea.io"

# Number of assets per page
PAGE_SIZE = 50

In [3]:
# Configuration
# If True, do not print info about individual assets
QUIET = False

# Directory to save downloaded collections to
OUTPUT_DIR = "collections"

In [4]:
def download_collection(collection):
    assets = None
    page = 0

    #\TODO create collection directory
    while True:
        if page * PAGE_SIZE > 10000:
            # API restriction - can only use offsets up to 10000
            break

        req_url = API_URL + "/assets"
        req_params = {
            "collection": collection,
            "offset": page * PAGE_SIZE,
            "limit": PAGE_SIZE,
            "order_direction": "asc"
        }

        # Get assets in collection
        resp = requests.get(req_url, params=req_params)
        if resp.status_code != 200:
            print(f"Error {resp.status_code} on page {page} of collection '{collection}'")
            break

        # Convert API response to JSON
        try:
            resp = resp.json()
        except Exception as e:
            print(e)
            break

        assets = resp["assets"]
        if assets == []:
            if page == 0:
                print(f"Collection '{collection}' does not exist")
            break

        # Verify that the output directory exists
        if not os.path.isdir(OUTPUT_DIR + "/" + collection):
            os.mkdir(OUTPUT_DIR + "/" + collection)

        # Download all assets on this page
        for asset in assets:
            download_asset(collection, asset)

        page += 1
        time.sleep(5)
        print("Waiting .....")

In [5]:
def download_asset(collection, asset):
    global usd_total

    # Name of this asset
    asset_name = asset["name"]

    if asset_name is None:
        asset_name = asset["token_id"]

    # URL of asset content
    asset_url = ""

    if asset["animation_url"] is not None:
        asset_url = asset["animation_url"]
    elif asset["image_url"] is not None:
        asset_url = asset["image_url"]

    if asset_url == "":
        return

    # Download asset content
    req = requests.get(asset_url, stream=True)

    # Output file extension
    asset_ext = ""
    ctype = req.headers["Content-Type"]
    if "image" in ctype or "video" or "audio" in ctype:
        asset_ext = ctype.split("/")[1]
    else:
        print(f"Unrecognized Content-Type: {ctype}")
        asset_ext = "bin"

    # Output file path
    output_file = f"{OUTPUT_DIR}/{collection}/{asset_name}.{asset_ext}"

    if os.path.exists(output_file):
        # File already exists - don't re-download it
        return

    with open(output_file, "wb") as f:
        for chunk in req.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)

    if not QUIET:
        print(f"Downloading: {asset_name}")

In [6]:
def parse_flag(flag):
    flag = flag.split("=")
    prop = flag[0].lower()

    if prop == "quiet":
        global QUIET
        QUIET = True
    elif prop == "output-dir":
        global OUTPUT_DIR
        OUTPUT_DIR = flag[1]
    else:
        print(f"Unrecognized flag '{prop}'")
        exit()

In [7]:
def finish():
    print(f"\nFinish Download!\n")

In [8]:
def sig_handler(num, frame):
    finish()

In [9]:
if __name__ == "__main__":
    v = "cryptopunksmom"

    # Create root NFT collections directory if necessary
    if not os.path.isdir(OUTPUT_DIR):
        os.mkdir(OUTPUT_DIR)

    download_collection(v)

    finish()

Collection 'cryptopunksmom' does not exist

Finish Download!

