In [1]:
import pandas as pd 
import numpy as np 
import matplotlib.pyplot as plt 
import seaborn as sns 
import requests
import pprint
import time
import textwrap
import json
from dotenv import load_dotenv
import os
load_dotenv()
sns.set_theme('notebook', 'white')

Include .env file with an api key under GRAPH_API_KEY

In [3]:
API_KEY = os.environ.get("GRAPH_API_KEY") 
def run_query(query: str, variables: dict | None = None):
    payload = {"query": query, "variables": variables or {}}
    r = requests.post(ENDPOINT, json=payload)
    r.raise_for_status()
    data = r.json()
    if "errors" in data:
        raise RuntimeError(data["errors"])
    return data["data"]

Choose a subgraph

In [4]:
SUBGRAPH_ID = "JCNWRypm7FYwV8fx5HhzZPSFaMxgkPuw4TnR3Gpi81zk" 
ENDPOINT = f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/{SUBGRAPH_ID}"

Print its table names

In [5]:

introspection = """
{
__schema {
queryType {
fields { name }
}
}
}
"""
root_fields = run_query(introspection)["__schema"]["queryType"]["fields"]
pprint.pprint([f["name"] for f in root_fields])

['token',
 'tokens',
 'rewardToken',
 'rewardTokens',
 'interestRate',
 'interestRates',
 'fee',
 'fees',
 'revenueDetail',
 'revenueDetails',
 'oracle',
 'oracles',
 'lendingProtocol',
 'lendingProtocols',
 'marketList',
 'marketLists',
 'usageMetricsDailySnapshot',
 'usageMetricsDailySnapshots',
 'usageMetricsHourlySnapshot',
 'usageMetricsHourlySnapshots',
 'financialsDailySnapshot',
 'financialsDailySnapshots',
 'market',
 'markets',
 'marketDailySnapshot',
 'marketDailySnapshots',
 'marketHourlySnapshot',
 'marketHourlySnapshots',
 'account',
 'accounts',
 'position',
 'positions',
 'positionSnapshot',
 'positionSnapshots',
 'activeAccount',
 'activeAccounts',
 'txSigner',
 'txSigners',
 'positionCounter',
 'positionCounters',
 'deposit',
 'deposits',
 'withdraw',
 'withdraws',
 'borrow',
 'borrows',
 'repay',
 'repays',
 'liquidate',
 'liquidates',
 'transfer',
 'transfers',
 'flashloan',
 'flashloans',
 'defaultOracle',
 'defaultOracles',
 'flashLoanPremium',
 'flashLoanPremiums

In [6]:
def gql(q: str, v: dict | None = None):
    r = requests.post(ENDPOINT, json={"query": q, "variables": v or {}})
    r.raise_for_status()
    data = r.json()
    if "errors" in data:
        raise RuntimeError(json.dumps(data["errors"], indent=2))
    return data["data"]

def show_fields(type_name: str):
    q = """
    query($t: String!){
      __type(name: $t){
        name
        fields { name }
      }
    }"""
    fields = gql(textwrap.dedent(q), {"t": type_name})["__type"]["fields"]
    print(f"\nFields on {type_name}:")
    print(", ".join(f["name"] for f in fields))

In [7]:
show_fields("Liquidate")


Fields on Liquidate:
id, hash, nonce, logIndex, gasPrice, gasUsed, gasLimit, blockNumber, timestamp, liquidator, liquidatee, market, positions, asset, amount, amountUSD, profitUSD


# AAVE

## Hourly snapshots of relevant pools

In [None]:
USDC_ADDRESS = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"  
USDT_ADDRESS = "0xdac17f958d2ee523a2206206994597c13d831ec7"  
DAI_ADDRESS = "0x6B175474E89094C44Da98b954EedeAC495271d0F"
WETH_ADDRESS = '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2'
USDE_ADDRESS = "0x4c9EDD5852cd905f086C759E8383e09bff1E68B3"

def gql(query: str, variables: dict | None = None) -> dict:
    while True:
        r = requests.post(ENDPOINT, json={"query": query, "variables": variables or {}})
        if r.status_code == 429:                           # hit the rate limit → wait & retry
            time.sleep(int(r.headers.get("Retry-After", "2")))
            continue
        r.raise_for_status()
        data = r.json()
        if "errors" in data:
            raise RuntimeError(json.dumps(data["errors"], indent=2))
        return data["data"]

def get_data_coin_aave(name):
  coins = {
      "usdt": USDT_ADDRESS,
      "usdc": USDC_ADDRESS,
      "dai": DAI_ADDRESS,
      "eth": WETH_ADDRESS,
      "usde" : USDE_ADDRESS
  }

  coin_address = coins[name]

  MKT_QUERY = """
  query($token: String!) {
    markets(where: {inputToken: $token}) {
      id
      name
      inputToken { symbol }
    }
  }
  """
  markets = gql(MKT_QUERY, {"token": coin_address})["markets"]
  if not markets:
      raise ValueError("USDT market not found in this subgraph.")
  MARKET_ID = markets[0]["id"]
  print("Found market:", markets[0]["name"], "→", MARKET_ID)


  SNAP_QUERY = """
  query($mkt: String!, $first: Int!, $skip: Int!) {
    marketHourlySnapshots(
      where: {market: $mkt}
      orderBy: timestamp
      orderDirection: asc
      first: $first
      skip: $skip
    ) {
      id
      timestamp
      blockNumber
      totalValueLockedUSD
      totalDepositBalanceUSD
      totalBorrowBalanceUSD
      hourlyLiquidateUSD
      inputTokenBalance
      rates {
        side          # LENDER / BORROWER
        type          # VARIABLE / STABLE
        rate          # per-second ray (1e27)
      }
    }
  }
  """

  def fetch_daily_snapshots(mkt_id: str, batch: int = 1000) -> list[dict]:
      out, skip = [], 0
      while True:
          chunk = gql(SNAP_QUERY, {"mkt": mkt_id, "first": batch, "skip": skip})\
                      ["marketHourlySnapshots"]
          if not chunk:
              break
          out.extend(chunk)
          skip += len(chunk)
      return out

  raw_snaps = fetch_daily_snapshots(MARKET_ID)
  print("Fetched", len(raw_snaps), "hourly snapshots")

  def to_row(snap: dict) -> dict:
      row = {
          "timestamp"      : int(snap["timestamp"]),
          "block"          : int(snap["blockNumber"]),
          "TVL_USD"        : float(snap["totalValueLockedUSD"]),
          "supplied_USD"   : float(snap["totalDepositBalanceUSD"]),
          "borrowed_USD"   : float(snap["totalBorrowBalanceUSD"]),
          "liquidations"   : float(snap['hourlyLiquidateUSD']),
          "token_balance"  : float(snap["inputTokenBalance"]),
      }

      # initialise APR columns with NaN
      for side in ("lender", "borrower"):
          for typ in ("variable", "stable"):
              row[f"{side}_{typ}_apr"] = None

      SECONDS_PER_YEAR = 60 * 60 * 24 * 365
      for r in snap["rates"]:
          col = f"{r['side'].lower()}_{r['type'].lower()}_apr"
          row[col] = float(r["rate"])
          # row[col] = float(r["rate"]) / 1e27 * SECONDS_PER_YEAR * 100  # % APR
      return row

  df = pd.DataFrame(map(to_row, raw_snaps))
  df["date"] = pd.to_datetime(df["timestamp"], unit="s")
  df.set_index("date", inplace=True)
  df.sort_index(inplace=True)

  df.index = df.index.ceil('h')
  df = df.groupby(level = 0).mean()
  full_idx = pd.date_range(df.index[0].ceil("H"),
                          df.index[-1].ceil("H"),
                          freq="1H")
  df = df.reindex(full_idx,method = 'ffill')
  df.to_parquet(f'aave_v3_{name}.parquet')

In [26]:
for coin in ['usdt','usdc','dai','eth','usde']:
    get_data_coin_aave(coin)

Found market: Aave Ethereum USDT → 0x23878914efe38d27c4d67ab83ed1b93a74d4086a
Fetched 23212 daily snapshots


  full_idx = pd.date_range(df.index[0].ceil("H"),
  df.index[-1].ceil("H"),


Found market: Aave Ethereum USDC → 0x98c23e9d8f34fefb1b7bd6a91b7ff122f4e16f5c
Fetched 24665 daily snapshots


  full_idx = pd.date_range(df.index[0].ceil("H"),
  df.index[-1].ceil("H"),


Found market: Aave Ethereum DAI → 0x018008bfb33d285247a21d44e50697654f754e63
Fetched 19193 daily snapshots


  full_idx = pd.date_range(df.index[0].ceil("H"),
  df.index[-1].ceil("H"),


Found market: Aave Ethereum WETH → 0x4d5f47fa6a74757f35c14fd3a6ef8e3c9bc514e8
Fetched 24937 daily snapshots


  full_idx = pd.date_range(df.index[0].ceil("H"),
  df.index[-1].ceil("H"),


Found market: Aave Ethereum USDe → 0x4f5923fc5fd4a93352581b38b7cd26943012decf
Fetched 8380 daily snapshots


  full_idx = pd.date_range(df.index[0].ceil("H"),
  df.index[-1].ceil("H"),


## Hourly $ amount of liquidations

In [27]:
import os, time, json, requests
import pandas as pd

API_KEY      = os.getenv("GRAPH_API_KEY") or "YOUR_API_KEY_HERE"
SUBGRAPH_ID  = "JCNWRypm7FYwV8fx5HhzZPSFaMxgkPuw4TnR3Gpi81zk"
ENDPOINT     = f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/{SUBGRAPH_ID}"

# ---------- GraphQL helper ---------------------------------------------------
def gql(query: str, variables: dict | None = None) -> dict:
    while True:
        r = requests.post(ENDPOINT, json={"query": query, "variables": variables or {}})
        if r.status_code == 429:                # rate-limit → wait & retry
            time.sleep(int(r.headers.get("Retry-After", "2")))
            continue
        r.raise_for_status()
        data = r.json()
        if "errors" in data:
            raise RuntimeError(json.dumps(data["errors"], indent=2))
        return data["data"]

# ---------- liquidation fetcher (all collaterals) ---------------------------
LIQ_QUERY_ALL = """
query($first:Int!, $skip:Int!){
  liquidates(
    orderBy: timestamp
    orderDirection: asc
    first: $first
    skip:  $skip
  ){
    id
    hash
    nonce
    logIndex
    gasPrice
    gasUsed
    gasLimit
    blockNumber
    timestamp
    liquidator
    liquidatee
    market { id }
    positions { id }
    asset
    amount
    amountUSD
    profitUSD
  }
}
"""

def fetch_all_liquidations(batch: int = 1000) -> pd.DataFrame:
    """Fetch all liquidations (all collateral assets) as a raw events DataFrame."""
    rows, skip = [], 0
    while True:
        data = gql(LIQ_QUERY_ALL, {"first": batch, "skip": skip})["liquidates"]
        if not data:
            break
        rows.extend(data)
        skip += len(data)

    if not rows:
        return pd.DataFrame()

    # normalise nested objects for pandas
    for r in rows:
        r["market"]    = r["market"]["id"] if r["market"] else None
        r["positions"] = ",".join(p["id"] for p in r["positions"]) if r["positions"] else ""

    df = pd.DataFrame(rows)

    # timestamp → datetime index
    df["datetime"] = pd.to_datetime(df["timestamp"].astype(int), unit="s", utc=True)
    df.set_index("datetime", inplace=True)
    df.sort_index(inplace=True)

    # convert amountUSD to numeric
    df["amountUSD"] = pd.to_numeric(df["amountUSD"], errors="coerce")

    return df

def fetch_hourly_liquidations_usd(batch: int = 1000) -> pd.DataFrame:
    """
    Fetch all liquidations and return an hourly time series
    of total liquidation amount in USD (across all collaterals).
    """
    df = fetch_all_liquidations(batch=batch)
    if df.empty:
        return df

    # Aggregate to hourly bins
    hourly = (
        df["amountUSD"]
        .resample("1H")
        .sum()
        .fillna(0.0)
        .to_frame(name="liquidation_usd")
    )
    return hourly

if __name__ == "__main__":
    hourly_liqs = fetch_hourly_liquidations_usd(batch=1000)
    print(hourly_liqs.head())

    # Save to parquet if desired
    hourly_liqs.to_parquet("aave_hourly_liquidations.parquet")

                           liquidation_usd
datetime                                  
2023-01-30 19:00:00+00:00      1322.616413
2023-01-30 20:00:00+00:00         0.000000
2023-01-30 21:00:00+00:00         0.000000
2023-01-30 22:00:00+00:00         0.000000
2023-01-30 23:00:00+00:00         0.000000


  .resample("1H")


# Uniswap

## Relevant Pool Data

### stable-stable pools

In [46]:
API_KEY = os.environ.get("GRAPH_API_KEY") 
SUBGRAPH_ID = "5zvR82QoaXYFyDEKLZ9t6v9adgnptxYpKpSbxtgVENFV" 
ENDPOINT = f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/{SUBGRAPH_ID}"

def run_query(query: str, variables: dict | None = None):
    payload = {"query": query, "variables": variables or {}}
    r = requests.post(ENDPOINT, json=payload)
    r.raise_for_status()
    data = r.json()
    if "errors" in data:
        raise RuntimeError(data["errors"])
    return data["data"]

In [62]:
def collect_stable_pool(token0,token1):
    stables = {
        "USDC": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
        "USDT": "0xdac17f958d2ee523a2206206994597c13d831ec7",
        "DAI":  "0x6b175474e89094c44da98b954eedeac495271d0f",
        "USDE": "0x4c9edd5852cd905f086c759e8383e09bff1e68b3",
    }
    stables_r = {y:x for (x,y) in stables.items()}
    pools_query = """
    query ($token0: String!, $token1: String!) {
    pools(
        where: {
        token0_in: [$token0, $token1]
        token1_in: [$token0, $token1]
        }
    ) {
        id
        feeTier
        totalValueLockedUSD
        token0 { id symbol }
        token1 { id symbol }
    }
    }
    """
    

    pools = run_query(pools_query, {"token0": stables[token0], "token1": stables[token1]})["pools"]
    POOL_ID = pools[0]['id']
    t0 = stables_r[f'{pools[0]["token0"]["id"]}']
    t1 = stables_r[f'{pools[0]["token1"]["id"]}'] 
    pool_hour_query = """
    query ($poolId: String!, $skip: Int!) {
    poolHourDatas(
        first: 1000
        skip: $skip
        orderBy: periodStartUnix
        orderDirection: asc
        where: { pool: $poolId }
    ) {
        periodStartUnix
        token0Price
        token1Price
        volumeToken0
        volumeToken1
        volumeUSD
        tvlUSD
    }
    }
    """

    all_rows = []
    skip = 0
    while True:
        batch = run_query(pool_hour_query, {"poolId": POOL_ID, "skip": skip})["poolHourDatas"]
        if not batch:
            break
        all_rows.extend(batch)
        skip += len(batch)

    df = pd.DataFrame(all_rows)
    df["timestamp"] = pd.to_datetime(df["periodStartUnix"].astype(int), unit="s")
    df["token0Price"] = df["token0Price"].astype(float)
    df["token1Price"] = df["token1Price"].astype(float)
    df["volumeUSD"] = df["volumeUSD"].astype(float)
    df["volumeToken1"] = df["volumeToken1"].astype(float)
    df["volumeToken0"] = df["volumeToken0"].astype(float)
    df["tvlUSD"] = df["tvlUSD"].astype(float)

    # keep nice columns
    df = df[["timestamp", "token0Price", "token1Price", "volumeUSD", "tvlUSD"]]
    df = df.iloc[100:]
    df = df.rename(columns = {'token1Price' : t1,'token0Price' : t0})
    df.index = df['timestamp']
    df = df.drop(columns = ['timestamp'])
    df = df.resample('h').mean()
    df.to_parquet(f'{t0}_{t1}_pool')

    pool_day_query = """
    query ($poolId: String!, $skip: Int!) {
    poolDayDatas(
        first: 1000
        skip: $skip
        orderBy: date
        orderDirection: asc
        where: { pool: $poolId }
    ) {
        date
        token0Price
        token1Price
        volumeToken0
        volumeToken1
        volumeUSD
        tvlUSD
    }
    }
    """

    all_rows = []
    skip = 0
    while True:
        batch = run_query(pool_day_query, {"poolId": POOL_ID, "skip": skip})["poolDayDatas"]
        if not batch:
            break
        all_rows.extend(batch)
        skip += len(batch)

    df = pd.DataFrame(all_rows)
    df["timestamp"] = pd.to_datetime(df["date"].astype(int), unit="s")
    df["volumeUSD"] = df["volumeUSD"].astype(float)
    df["tvlUSD"] = df["tvlUSD"].astype(float)
    df = df[["timestamp","volumeUSD", "tvlUSD"]]
    df = df.iloc[1:]
    df.index = df['timestamp']
    df = df.drop(columns = ['timestamp'])
    df = df.resample('D').mean()
    df['vol_rate'] = df['volumeUSD']/df['tvlUSD']
    df.to_parquet(f'{t0}_{t1}_pool_volume_daily')

In [63]:
for pair in [['USDT','USDC'],['USDC','DAI'],['USDE','USDT']]:
    collect_stable_pool(pair[0],pair[1])