In [17]:
import urllib
import json
from urllib.parse import urljoin, urlparse

def write(json_data: dict, file_name: str):
    with open(file_name, 'w') as f:
        f.write(json.dumps(json_data))

def file_name_for_url(url: str) -> str:
    return urljoin(url, urlparse(url).path).rsplit('/', 1)[-1]

def file_extension_for_url(url: str) -> str:
    return os.path.splitext(file_name_for_url(url))[-1]

def cache_image_file_name(id: str, url: str) -> str:
    return id + file_extension_for_url(url)

space = '         '
req_interval = 1.25
req_interval_long = 60 * 5

In [14]:
# Download list of all currencies
import time

url = 'https://api.coingecko.com/api/v3/coins/list?include_platform=true'
response = urllib.request.urlopen(url)
currencies = json.loads(response.read())
write(currencies, 'tmp_step_01.json')
print('Downloaded list of ', len(currencies))
time.sleep(req_interval)

Downloaded list of  12968


In [15]:
# Include only native or ethereum ERC20s

def include(currency: dict) -> bool:
    if len(currency['platforms']) == 0:
        return True
    if 'ethereum' in currency['platforms']:
        return len(currency['platforms']['ethereum']) != 0
    return False

currencies = list(filter(lambda c: include(c) , currencies))

# Change id to coinGeckoId
for currency in currencies:
    currency['coinGeckoId'] = currency['id']
    currency.pop('id', None )

write(currencies, 'tmp_step_02.json')
print('Filtered out unsupported, remaining count', len(currencies))

Filtered out unsupported, remaining count 6311


In [16]:
# Get image url rank description
import time
import numpy as np
import requests

base_url = 'https://api.coingecko.com/api/v3/coins/'
query_params = '?localization=false&tickers=false&market_data=false&community_data=false&developer_data=false&sparkline=false'

for idx, currency in enumerate(currencies):
    url = base_url + currency['coinGeckoId'] + query_params
    info = dict()
    try:
        response = urllib.request.urlopen(url)
        info = json.loads(response.read())
    except:
        print('\r', 'Sleeping for 61s', idx, space, end='')
        time.sleep(req_interval_long)
        response = urllib.request.urlopen(url)
        info = json.loads(response.read())

    if 'image' in info and 'large' in info['image']:
        currency['imageUrl'] = info['image']['large']

    if 'coingecko_rank' in info:
        currency['rank'] = info['coingecko_rank']

    if 'links' in info:
        if 'homepage' in info['links'] and len(info['links']['homepage']) != 0:
            currency['link'] = info['links']['homepage'][0]

        if 'twitter_screen_name' in info['links'] is not None:
            currency['twitter'] = info['links']['twitter_screen_name']

    if 'description' in info and len(info['description']['en']) != 0:
        currency['description'] = info['description']['en']

    write(currencies, 'tmp_step_03.json')
    print('\r', 'Downloaded metadata', idx, end='')
    time.sleep(req_interval)

 Sleeping for 61s 1370          

HTTPError: HTTP Error 429: Too Many Requests

In [None]:
# Download images if needed
from urllib.parse import urljoin, urlparse
import os
import platform
import requests
import shutil

headers = requests.utils.default_headers()
default_agent = headers['User-Agent']
headers.update({'User-Agent': default_agent + ' (' + platform.platform() + ')'})
failed = list()

def download_image(currency: dict) -> int:
    image_name = cache_image_file_name(currency['coinGeckoId'], currency['imageUrl'])
    image_path = 'images/' + image_name
    if os.path.exists(image_path) == True or currency['imageUrl'] is None:
        return 200
    else:
        time.sleep(req_interval)
        res = requests.get(url, headers = headers, stream = True)
        if res.status_code == 200:
            with open(image_path, 'wb') as f:
                shutil.copyfileobj(res.raw, f)
                print('\r', 'Downloaded image', image_name, idx, space, end='')
        return res.status_code

for idx, currency in enumerate(currencies):
    status_code = download_image(currency)
    if status_code == 429:
        print('\r', 'Sleeping', currency['coinGeckoId'], idx, space, end='')
        time.sleep(req_interval_long)

print('Failed count', len(failed))

In [None]:
# Resize images
from PIL import Image
import os

image_length = 32 * 3
count = len(os.listdir('images/'))
failed_resize_images = list()

for idx, image_name in enumerate(os.listdir('images/')):
    path = 'images/' + image_name
    image = None
    try:
        image = Image.open(path)
    except:
        os.remove(path)
        print('\r', 'Removing image', image_name, idx, '/', count, space, end='')
    if image is not None:
        if image.size[0] != image_length or image.size[1] != image_length:
            image = image.resize((image_length, image_length), reducing_gap=3.0)
            try:
                image.save(path)
            except:
                failed_resize_images.append(path)
    print('\r', 'Resizing image', image_name, idx, '/', count, space, end='')

print('')
for failed_path in failed_resize_images:
    print(failed_path)
print('Failed images count', len(failed_resize_images))

In [None]:
# Get colors from images


In [None]:
# Download market data

In [None]:
# Resize images
from PIL import Image
from PIL import ImageColor
import os

for image_name in os.listdir('images/'):
    path = 'images/' + image_name


# for idx, currency in enumerate(currencies):
#     image_name = cache_image_file_name(currency['coinGeckoId'], currency['imageUrl'])
#     exists = os.path.exists('images/' + image_name)
#     print(exists, image_name)