In [1]:
import sys
sys.path.append('..') # for import src

import os
import cloudpickle
import lzma
import pandas as pd
import numpy as np
from scipy.stats import pearsonr
import matplotlib.pyplot as plt
import ccxt

import src
cloudpickle.register_pickle_by_value(src) # for model portability

In [2]:
import numpy as np
import pandas as pd
import time
from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport

def execute_query(client, q):
    query = gql('query Query {}'.format(q))
    return client.execute(query)

In [4]:
weth_id = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2'

def fetch_pair(client, created_at=0, weth_is_token1=False):
    tmpl = """
    {{
      pairs(orderBy: createdAtTimestamp, where: {{ createdAtTimestamp_gte: {}, token{}: "{}" }}, first: 1000) {{
        id
        createdAtTimestamp
        token0 {{
          id
          symbol
        }}
        token1 {{
          id
          symbol
        }}
      }}
    }}
    """
    return execute_query(client, tmpl.format(created_at, 1 if weth_is_token1 else 0, weth_id))

def do_fetch_all_pairs(client, created_at=1, weth_is_token1=False):
    dfs = []
    
    while True:
        print('created_at', created_at)
        try:
            res = fetch_pair(client, created_at, weth_is_token1)
        except Exception as e:
            print(e)
            print('retry')
            time.sleep(10)
            continue
        if len(res['pairs']) == 0:
            break
        print('len', len(res['pairs']))
        for row in res['pairs']:
            row['token0_id'] = row['token0']['id']
            row['token0_symbol'] = row['token0']['symbol']
            row['token1_id'] = row['token1']['id']
            row['token1_symbol'] = row['token1']['symbol']
        df = pd.DataFrame(res['pairs']).drop(columns=['token0', 'token1'])
        dfs.append(df)
        created_at = df['createdAtTimestamp'].max()
        if len(res['pairs']) < 1000:
            break

    df = pd.concat(dfs).drop_duplicates()
    df = df.rename(columns={
        'createdAtTimestamp': 'timestamp'
    })
    df['timestamp'] = df['timestamp'].astype('int')
    df = df.set_index(['id']).sort_index()
    return df

def fetch_all_pairs(client, created_at=1):
    dfs = [
        do_fetch_all_pairs(client, created_at, False),
        do_fetch_all_pairs(client, created_at, True),
    ]
    return pd.concat(dfs).sort_index()

transport = RequestsHTTPTransport(url="https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v2")
client = Client(transport=transport, fetch_schema_from_transport=True)

df_pair = fetch_all_pairs(client, 1668000000)
print('index duplicated count {}'.format(df_pair[df_pair.index.duplicated(keep='last')].shape[0]))
display(df_pair)
df_pair.to_pickle('/tmp/df_pair.pkl')

created_at 1668000000
len 297
created_at 1668000000
len 816
index duplicated count 0


Unnamed: 0_level_0,timestamp,token0_id,token0_symbol,token1_id,token1_symbol
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0x002c755bc6b99f61d59a76047a13d6be41bb8fd1,1668312539,0x22e7250df8dc9eb3cf2f3e21de58b1a4e2c36946,IM,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH
0x00513eedf43b040301315473f193152143f28cbd,1668456983,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH,0xc2089ed438021ac6a07b4f814787c02b4e34f109,test2
0x0084224f7c5635bbd8d8e436ca246f3b4f0e7197,1668116723,0x85d913b0afab2630544660089d6cbb85d4560393,CR7,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH
0x00b7bebc45ae2aac8af760db9315826c657cd8f4,1668377891,0x83fca6b2b527e51c4d2dc4b12fed40752794238d,ToM,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH
0x00b7e4a0f0b2aeffcefb92d6a78197b888196463,1668425507,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH,0xecead914cbf1751369fdb278704621926d1caac0,Pepe-TV
...,...,...,...,...,...
0xfee78a4782064296fbb5dd1ced086676bedb0d59,1668402131,0x19cce203a8563df72fcc17fea1f5d4d7de5b8e40,GAINS,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH
0xff3daea632eccaa05759f0c4d8dba46ebd555402,1668465203,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH,0xe1f8ad437bfadc5d2f42a4126403ee6c4200ee4e,MTLS
0xff6354df9854ee901593b93384dcff979599f3fa,1668119975,0x9dd9a8379d73ea7e5bc7b7d1af85c40ac93262c6,COKEBEAR,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH
0xff769f62d94283001b39f77c41c28f279e73d4ae,1668435539,0xa630422573fc622c6f389bd60608722d73117d6b,TIMP,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH


In [5]:

class PairFetcher:
    def __init__(self):
        self.keys = {}
        self.data_id = 'univ2_pair'
        
    def fetch(self, last_timestamp=None):
        transport = RequestsHTTPTransport(url="https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v2")
        client = Client(transport=transport, fetch_schema_from_transport=True)
        
        df = fetch_all_pairs(client, 1 if last_timestamp is None else last_timestamp + 1)
        
        df = df.loc[df['timestamp'] < df['timestamp'].max() - 300] # buffer
        
        return df


In [22]:
fetcher = PairFetcher()
df = fetcher.fetch(last_timestamp=None)
display(df)
df = fetcher.fetch(last_timestamp=1668000000)
display(df)

created_at 1
len 1000
created_at 1597546980
len 1000
created_at 1600545834
len 1000
created_at 1602639738
len 1000
created_at 1604158136
{'message': 'Failed to get entities from store: canceling statement due to conflict with recovery, query = /* controller=\'filter\',application=\'sgd213744\',route=\'c9b0081408fe305-baa7bbcd35867dc\',action=\'15972885\' */\nselect \'Pair\' as entity, to_jsonb(c.*) as data from (select  * \n  from "sgd213744"."pair" c\n where c.block_range @> $1 and coalesce(upper("block_range"), 2147483647) > $2 and lower("block_range") <= $3 and (c."created_at_timestamp" >= $4::numeric and c."token_0" = $5)\n\n order by "created_at_timestamp" asc, "id" asc\n limit 1000) c -- binds: [15972885, 15972885, 15972885, "1604158136", "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"]'}
retry
created_at 1604158136
len 1000
created_at 1606678827
len 1000
created_at 1613250431
len 1000
created_at 1619758213
len 1000
created_at 1622475131
len 1000
created_at 1623884729
len 1000
creat

KeyboardInterrupt: 

In [13]:
from joblib import Parallel, delayed

def fetch_pair_hour_data(client, hour_start_unix=0, end_unix=0, pair_ids=None):
    tmpl = """
    {{
      pairHourDatas(orderBy: hourStartUnix, first: 1000, where: {{ hourStartUnix_gte: {}, hourStartUnix_lt: {}, pair_in: [{}], }}) {{
        id
        hourStartUnix
        pair {{
          id
        }}
        reserve0
        reserve1
        reserveUSD
        hourlyVolumeToken0
        hourlyVolumeToken1
        hourlyVolumeUSD
        hourlyTxns
      }}
    }}
    """
    pair_ids_str = ','.join(['"{}"'.format(x) for x in pair_ids])
    return execute_query(client, tmpl.format(hour_start_unix, end_unix, pair_ids_str))

def do_fetch_all_pair_hour_datas(client, pair_ids=None, raise_error=False, hour_start_unix=None, end_unix=None):
    transport = RequestsHTTPTransport(url="https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v2")
    client = Client(transport=transport, fetch_schema_from_transport=True)

    dfs = []
    while hour_start_unix < end_unix:
        print('pair_ids hour_start_unix end_unix', pair_ids[0], pair_ids[-1], hour_start_unix, end_unix)
        try:
            res = fetch_pair_hour_data(client, hour_start_unix=hour_start_unix, end_unix=end_unix, pair_ids=pair_ids)
        except Exception as e:
            if raise_error:
                raise
            print(e)
            print('retry')
            time.sleep(10)
            continue
        print('len', len(res['pairHourDatas']))
        if len(res['pairHourDatas']) == 0:
            break
        for row in res['pairHourDatas']:
            row['pair_id'] = row['pair']['id']
        df = pd.DataFrame(res['pairHourDatas']).drop(columns=['pair'])
        dfs.append(df)
        hour_start_unix = df['hourStartUnix'].max()
        if len(res['pairHourDatas']) < 1000:
            break
            
    if len(dfs) == 0:
        return None

    df = pd.concat(dfs).drop_duplicates()
    df = df.rename(columns={
        'hourStartUnix': 'timestamp'
    }).drop(columns=['id'])
    df['timestamp'] = df['timestamp'].astype('int')
    df['reserve0'] = df['reserve0'].astype('float')
    df['reserve1'] = df['reserve1'].astype('float')
    df['reserveUSD'] = df['reserveUSD'].astype('float')
    df['hourlyVolumeToken0'] = df['hourlyVolumeToken0'].astype('float')
    df['hourlyVolumeToken1'] = df['hourlyVolumeToken1'].astype('float')
    df['hourlyVolumeUSD'] = df['hourlyVolumeUSD'].astype('float')
    df['hourlyTxns'] = df['hourlyTxns'].astype('float')
    df = df.set_index(['timestamp', 'pair_id']).sort_index()
    return df

def fetch_all_pair_hour_datas(client, pair_ids=None, hour_start_unix=None, end_unix=None):
    dfs = []
    
    for i in range(0, len(pair_ids), 100):
        df = delayed(do_fetch_all_pair_hour_datas)(
            client,
            pair_ids=pair_ids[i:min(i + 100, len(pair_ids))], 
            raise_error=False,
            hour_start_unix=hour_start_unix,
            end_unix=end_unix
        )
        dfs.append(df)
        
    dfs = Parallel(n_jobs=16, backend='threading')(dfs)
    dfs = [x for x in dfs if x is not None]
        
    df = pd.concat(dfs)
    df = df.sort_index()
    return df

transport = RequestsHTTPTransport(url="https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v2")
client = Client(transport=transport, fetch_schema_from_transport=True)

df_pair = pd.read_pickle('/tmp/df_pair.pkl')

pair_ids = sorted(df_pair.index.unique().tolist())
df_hour = fetch_all_pair_hour_datas(client, pair_ids, 1668000000, 1668000000 + 3600 * 24)

print('index duplicated count {}'.format(df_hour[df_hour.index.duplicated(keep='last')].shape[0]))
df_hour.to_pickle('/tmp/df_hour.pkl')
display(df_hour)

pair_ids hour_start_unix end_unixpair_ids hour_start_unix end_unix 0x181a2324e7e91d5d7d63b30822dd3aafae7107bb 0x318786ccbfc2f29e61e6c754b515ca000cb4df00 1668000000 1668086400
 0x002c755bc6b99f61d59a76047a13d6be41bb8fd1 0x17ec506ad257c6b0e65d2b56208b321b28d20a9c 1668000000 1668086400
pair_ids hour_start_unix end_unix 0x31af37362a6086d4ea56f540cd590627317faf9c 0x491ee5d66d17b614fba2e3dddda959babdbde28c 1668000000 1668086400
pair_ids hour_start_unix end_unix 0x49502c46d9d5c1becca441dadfcbc9ce2f9c88e5 0x605ca33ca2a3187a822c79aa1bb648713143de8f 1668000000 1668086400
pair_ids hour_start_unix end_unix 0x606f7dc0b7d35a37deb57823de8d22f3cdfc61aa 0x79bf693606be51f6c32120acbc5d84014554e6e6 1668000000 1668086400
pair_ids hour_start_unix end_unix 0x79c2bad90adb9befb35963d105e679d0629dcb8a 0x8ffb234857247554435df90f5bd5e1e3fd78900b 1668000000 1668086400
pair_ids hour_start_unix end_unix 0x904c18f2caaba96222e4ac5e5ec158da40a443ea 0xa29a328510c024fc232b456efb1dd608362fdfe4 1668000000 1668086400
pair_i

Unnamed: 0_level_0,Unnamed: 1_level_0,reserve0,reserve1,reserveUSD,hourlyVolumeToken0,hourlyVolumeToken1,hourlyVolumeUSD,hourlyTxns
timestamp,pair_id,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1668002400,0x1150847f011903811a7a9b7498ca47c8dc5eba14,6.916400e-14,1.500000e-08,1.660351e-10,9.405168e-01,1.692831e+05,0.0,33.0
1668002400,0x5015f3a3720e3f92fb2271f61010c606078c8a11,7.425819e+05,6.739205e-01,8.257793e+02,2.574181e+05,1.739205e-01,0.0,11.0
1668002400,0x692b4c51dbc783a38fce2b243610c548bbe7c40e,3.852801e+06,3.063660e+00,7.508029e+03,7.363457e+06,2.546046e+00,0.0,89.0
1668002400,0x7b95f771711776f5d704a186d74a2584cb2206c2,8.960493e+05,2.024545e+00,4.959672e+03,2.882444e+05,7.938714e-01,0.0,27.0
1668002400,0x7eb264f23f376aa7918101aec0c0b1a6753053f7,7.329538e+08,1.368924e+00,1.677367e+03,3.272406e+08,4.321272e-01,0.0,46.0
...,...,...,...,...,...,...,...,...
1668085200,0x98b57cce2f7f07e2da7922ddf18a1e6ec7b54e63,1.340660e+08,1.157978e+01,2.968942e+04,5.257543e+06,4.633416e-01,0.0,4.0
1668085200,0xb82d7c2c0363b2970820827322b7d066039ca7f2,5.897471e+00,1.719100e+05,1.469192e+04,2.050969e+00,6.545758e+04,0.0,13.0
1668085200,0xbbe5fa89eb01576da378351208e583e50cbcf339,2.695175e+00,7.486225e+08,7.004726e+03,2.868236e+00,5.962016e+08,0.0,50.0
1668085200,0xd5dea2e160a4b651369b5f300201d3676ebe2eea,4.821719e+05,3.958788e+00,1.023979e+04,9.374787e+05,5.283928e+00,0.0,97.0


In [16]:
from google.cloud import bigquery

class HourFetcher:
    def __init__(self):
        self.keys = {}
        self.data_id = 'univ2_hour'
        
    def fetch(self, last_timestamp=None):
        transport = RequestsHTTPTransport(url="https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v2")
        client = Client(transport=transport, fetch_schema_from_transport=True)
        
        # https://etherscan.io/block/10000834
        factory_deploy_at = 1588566790
        
        pair_ids = self._get_pair_ids()
        start_unix = 1 if last_timestamp is None else last_timestamp + 1
        end_unix = factory_deploy_at + 30 * 24 * 3600 if last_timestamp is None else start_unix + 30 * 24 * 3600
        df = fetch_all_pair_hour_datas(client, pair_ids, start_unix, end_unix)
        
        t = df.index.get_level_values('timestamp')
        df = df.loc[t < t.max()] # remove partial
        
        return df

    def _get_pair_ids(self):
        project_id = os.getenv('GC_PROJECT_ID')
        dataset_name = os.getenv('ALPHAPOOL_DATASET')
        table_id = f'{dataset_name}.univ2_pair'
        
        client = bigquery.Client(project=project_id)
        query = f'SELECT id FROM `{table_id}`'
        query_job = client.query(query)
        ids = []
        for row in query_job:
            ids.append(row['id'])
        
        return sorted(ids)

In [13]:
fetcher = HourFetcher()
# df = fetcher.fetch(last_timestamp=None)
# display(df)
df = fetcher.fetch(last_timestamp=(1668000000 // 3600) * 3600)
display(df)

DefaultCredentialsError: Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see https://cloud.google.com/docs/authentication/getting-started

In [17]:
fetchers = [
    PairFetcher(),
    HourFetcher(),
]

data = cloudpickle.dumps(fetchers)
data = lzma.compress(data)
with open('/home/jovyan/data/20221114_univ2.xz', 'wb') as f:
    f.write(data)