# Subgraph Queries with DataStreams

The goal of this notebook is to show and explain how to create a decentralized data pipeline powered by Cow Subgraphs with DataStreams. Right now DataStreams acts only as a GraphQL query manager. Should DataStreams implement any preprocessing or should that be done outside of the library? Currently I am leaning towards finishing it outside of the library but still in the jupyter notebook. 

2.4.23 What needs to happen to keep the preprocessing as simple as possible to fit in a single notebook? How would I describe my current preprocessing approach and what has been done thus far?

Subgraph Link here - https://thegraph.com/hosted-service/subgraph/cowprotocol/cow
Dune Query link - https://dune.com/queries/1941061

### TODO 
- Why does settlements.trades and trades queries return different values???

## Setup Jupyter Environment

First install DataStreams with `!pip install git+https://github.com/Evan-Kim2028/DataStreams.git` in jupyter or `pip install git+https://github.com/Evan-Kim2028/DataStreams.git` in a virtual environment. The primary DataStreams dependencies are Python 3.10, Subgrounds, and Pandas. 

In [None]:
!pip install git+https://github.com/Evan-Kim2028/DataStreams.git

In [None]:
!pip install plotly

In [1]:
from datastreams.datastream import Streamer

import os
import pandas as pd
import polars as pl

In [2]:
# These commands enlarge the column size of the dataframe so things like 0x... are not truncated
pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)
pd.set_option('max_colwidth', None)

In [3]:
# instantiate Streamer class. Note that we need two separate streamer classes, otherwise the queries will be overwritten. 
cow_ds1 = Streamer('https://api.thegraph.com/subgraphs/name/cowprotocol/cow')
cow_ds2 = Streamer('https://api.thegraph.com/subgraphs/name/cowprotocol/cow')

In [4]:
# DEFINE TIMESTAMP HERE. Timstamp is used for replication quality assurance purposes.
timestamp = 1700000000

# define ethereum token addresses here to be used in cowswap trades query filter
weth_addr = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
usdc_addr = "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"

# we set a fixed query size number. The Cow settlements and Uniswap swaps query are multiples larger than this initial query size.
query_size = 5000

### Cowswap Trades Schema

In [5]:
# We need to make two queries to the cow schema to get all the trades that match weth/usdc and usdc/weth.
trades_weth_usdc_fp = cow_ds1.queryDict.get('trades')
trades_usdc_weth_fp = cow_ds2.queryDict.get('trades')

# trades query path that gets weth -> usdc trades
trades_weth_usdc_qp = trades_weth_usdc_fp(
    first=query_size,
    orderBy='timestamp',
    orderDirection='desc',
    where = {
    'timestamp_lt': timestamp, 
    'buyAmountUsd_gt': 100, 
    'sellAmountUsd_gt': 100, 
    "sellToken": weth_addr, 
    "buyToken": usdc_addr
    }
)

# trades query path that gets usdc -> weth trades
trades_usdc_weth_qp = trades_usdc_weth_fp(
    first=query_size,
    orderBy='timestamp',
    orderDirection='desc',
    where = {
    'timestamp_lt': timestamp, 
    'buyAmountUsd_gt': 100, 
    'sellAmountUsd_gt': 100, 
    "sellToken": usdc_addr, 
    "buyToken": weth_addr
    }
)

# run query
trades_weth_usdc_df = cow_ds1.runQuery(trades_weth_usdc_qp)
trades_usdc_weth_df = cow_ds2.runQuery(trades_usdc_weth_qp)

FIELD - trades
FIELD - trades


In [6]:
# combine the trades queries together
trades_df = pd.concat([trades_weth_usdc_df, trades_usdc_weth_df])

In [7]:
print(f'query returned {len(trades_df)} rows')

query returned 10000 rows


In [8]:
# get unique values in trades_df column to verify the query results.
trades_df['trades_buyToken_id'].unique()

array(['0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',
       '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2'], dtype=object)

In [9]:
# replace the above values with symbols
trades_df['trades_buyToken_id'] = trades_df['trades_buyToken_id'].replace(weth_addr, 'WETH')
trades_df['trades_buyToken_id'] = trades_df['trades_buyToken_id'].replace(usdc_addr, 'USDC')

trades_df['trades_sellToken_id'] = trades_df['trades_sellToken_id'].replace(weth_addr, 'WETH')
trades_df['trades_sellToken_id'] = trades_df['trades_sellToken_id'].replace(usdc_addr, 'USDC')

### Cowswap Trades-Settlement Merge

In [20]:
# get a query field path from the query dictionary which is automatically populated in the Streamer object
settlements_fp = cow_ds1.queryDict.get('settlements')

# add parameters to the settlements_qp.
settlements_qp = settlements_fp(
    first=query_size * 5,
    orderBy='firstTradeTimestamp',
    orderDirection='desc',
    where = {'firstTradeTimestamp_lt': timestamp} 
    )

# run query
settlements_df = cow_ds1.runQuery(settlements_qp)

FIELD - settlements


In [21]:
settlements_df.size

125000

In [22]:
trades_df.dtypes

trades_id                object
trades_timestamp          int64
trades_gasPrice           int64
trades_feeAmount          int64
trades_txHash            object
trades_settlement_id     object
trades_buyAmount        float64
trades_sellAmount       float64
trades_sellToken_id      object
trades_buyToken_id       object
trades_order_id          object
trades_buyAmountEth     float64
trades_sellAmountEth    float64
trades_buyAmountUsd     float64
trades_sellAmountUsd    float64
endpoint                 object
dtype: object

In [23]:
# enforce trades_df column types. This is necessary because the data types are not enforced by pandas dataframes. Types are enforced as a Polars dataframe because of the columnar storage method.
trades_df['trades_buyAmount'] = trades_df['trades_buyAmount'].astype('float64')
trades_df['trades_sellAmount'] = trades_df['trades_sellAmount'].astype('float64')
trades_df['trades_buyAmountUsd'] = trades_df['trades_buyAmountUsd'].astype('float64')
trades_df['trades_sellAmountUsd'] = trades_df['trades_sellAmountUsd'].astype('float64')
trades_df['trades_timestamp'] = trades_df['trades_timestamp'].astype('int64')
trades_df['trades_buyToken_id'] = trades_df['trades_buyToken_id'].astype('str')
trades_df['trades_sellToken_id'] = trades_df['trades_sellToken_id'].astype('str')

In [24]:
# convert dfs into a dictionaries
settlement_dict = settlements_df.to_dict('records')
trades_dict = trades_df.to_dict('records')

In [25]:
# convert dictionaries into polars dataframes
settlement_pl = pl.from_dicts(settlement_dict)
trades_pl = pl.from_dicts(trades_dict)

In [26]:
# merge trades and settlement dataframes on the settlement transaction hash
cow_trades_pl = trades_pl.join(other=settlement_pl, left_on='trades_settlement_id', right_on='settlements_txHash', how='inner')

In [27]:
cow_trades_pl.shape

(1505, 20)

In [28]:
cow_trades_pl.head(5)

trades_id,trades_timestamp,trades_gasPrice,trades_feeAmount,trades_txHash,trades_settlement_id,trades_buyAmount,trades_sellAmount,trades_sellToken_id,trades_buyToken_id,trades_order_id,trades_buyAmountEth,trades_sellAmountEth,trades_buyAmountUsd,trades_sellAmountUsd,endpoint,settlements_id,settlements_firstTradeTimestamp,settlements_solver_id,endpoint_right
str,i64,i64,i64,str,str,f64,f64,str,str,str,f64,f64,f64,f64,str,str,i64,str,str
"""0xff4b9b71cf38...",1676578631,61410062773,10036655057984458,"""0xbddaeefac322...","""0xbddaeefac322...",667193664.0,4e+17,"""WETH""","""USDC""","""0xff4b9b71cf38...",0.391065,0.4,667.193664,682.437643,"""cow""","""0xbddaeefac322...",1676578631,"""0x0a308697e1d3...","""cow"""
"""0xe73f2b06961f...",1676577599,39833897263,67102919,"""0x748d001020e7...","""0x748d001020e7...",1.897e+20,325000000000.0,"""USDC""","""WETH""","""0xe73f2b06961f...",189.704146,189.816638,324807.393682,325000.0,"""cow""","""0x748d001020e7...",1676577599,"""0xc9ec550bea1c...","""cow"""
"""0x0ed62f629a61...",1676577107,41145580487,8558829468340582,"""0x697d31e54fa4...","""0x697d31e54fa4...",31140000000.0,1.82e+19,"""WETH""","""USDC""","""0x0ed62f629a61...",18.18091,18.2,31139.528596,31172.224875,"""cow""","""0x697d31e54fa4...",1676577107,"""0x149d0f928233...","""cow"""
"""0xa3fbb73e02ac...",1676576375,40322628262,13413397075766624,"""0xed291288089f...","""0xed291288089f...",680956759.0,4.1e+17,"""WETH""","""USDC""","""0xa3fbb73e02ac...",0.39757,0.41,680.956759,702.246701,"""cow""","""0xed291288089f...",1676576375,"""0xc9ec550bea1c...","""cow"""
"""0x532018cd94c9...",1676576027,38815959540,10413085889647154,"""0x2231f039b899...","""0x2231f039b899...",4181700000.0,2.4524e+18,"""WETH""","""USDC""","""0x532018cd94c9...",2.441451,2.452413,4181.709518,4200.48487,"""cow""","""0x2231f039b899...",1676576027,"""0xc9ec550bea1c...","""cow"""


In [29]:
# get unique values in cow_trades_pl trades_sellToken_id column
cow_trades_pl['trades_sellToken_id'].unique()

trades_sellToken_id
str
"""USDC"""
"""WETH"""


### Cowswap Trades-Solver Merge

In [30]:
solvers = pd.read_csv('data/cowv2_solvers.csv') # load in pandas instead of polars. Having trouble replacing \ symbol in polars

In [31]:
# rename address to settlements_solver_id in pandas
solvers = solvers.rename(columns={"address": "settlements_solver_id"})

In [32]:
# NOTE - dune formats addresses as /x... need to convert '/' to '0'
solvers['settlements_solver_id'] = solvers['settlements_solver_id'].str.replace('\\', '0', regex=False)

In [33]:
# turn solvers into a dictionary
solvers_dict = solvers.to_dict('records')

# convert dict to polars
solvers_pl = pl.from_dicts(solvers_dict)

In [34]:
# inner join solvers_pl on total_settlement_tokens_pl
cow_complete_pl = cow_trades_pl.join(solvers_pl, on="settlements_solver_id", how="inner")

In [35]:
# drop endpoint_right column from total_settlement_tokens_solvers
cow_complete_pl = cow_complete_pl.drop('endpoint_right')

In [36]:
cow_complete_pl.shape

(1456, 22)

#### Basic Agg

In [37]:
# filter by "prod" environments
filter_df = cow_complete_pl.filter(pl.col("environment") == "prod")

In [38]:
# filter by "prod" environments
filter_df = cow_complete_pl.filter(pl.col("environment") == "prod")

In [39]:
filter_df.shape

(1438, 22)

In [40]:
# group filter_df by solver name polars
grouped_df = filter_df.groupby("name").agg(
    pl.count("trades_id").alias("total_trades")).sort("total_trades", reverse=True)


In [41]:
grouped_df

name,total_trades
str,u32
"""Otex""",514
"""PLM""",420
"""Laertes""",220
"""Quasilabs""",162
"""Gnosis_1inch""",63
"""Gnosis_0x""",18
"""Seasolver""",16
"""Baseline""",10
"""DexCowAgg""",7
"""Gnosis_Balance...",5


### Uniswap v3

In [42]:
# instantiate Streamer object. 
# Note - unlike the cow queries, univ3 does not require multiple streamer instantations because the swaps field path is reset each iteration. 
# If the Cow queries were updated to use the same method, we could use the same streamer object for all queries.
univ3_ds = Streamer('https://api.thegraph.com/subgraphs/name/messari/uniswap-v3-ethereum')

In [43]:
# get a query field path from the query dictionary which is automatically populated in the Streamer object
swaps_fp = univ3_ds.queryDict.get('swaps')

In [44]:
weth_usdc_list = [
    "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640", # usdc/weth .05%
    "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8" #usdc/weth .3%
]

In [45]:
swaps_df_list = []

In [46]:
for lp in weth_usdc_list:
    # add parameters to the query_path
    swaps_qp = swaps_fp(
        first=query_size * 10,
        orderBy='timestamp',
        orderDirection='desc',
        where = {'timestamp_lt': timestamp, 'amountInUSD_gt': 250, 'amountOutUSD_gt': 250, 'pool': lp} 
        )

    # run query
    swaps_df = univ3_ds.runQuery(swaps_qp)
    swaps_df_list.append(swaps_df)

FIELD - swaps
FIELD - swaps


In [47]:
# concat swaps_df_list
swaps_df = pd.concat(swaps_df_list)

In [48]:
# replace the pool addresses with LP pool names with fees
swaps_df['swaps_pool_id'] = swaps_df['swaps_pool_id'].replace(weth_usdc_list[0], 'USDC_WETH .05%')
swaps_df['swaps_pool_id'] = swaps_df['swaps_pool_id'].replace(weth_usdc_list[1], 'USDC_WETH .3%')

# replace token addresses with symbols
swaps_df['swaps_tokenIn_id'] = swaps_df['swaps_tokenIn_id'].replace(usdc_addr, 'USDC')
swaps_df['swaps_tokenIn_id'] = swaps_df['swaps_tokenIn_id'].replace(weth_addr, 'WETH')
swaps_df['swaps_tokenOut_id'] = swaps_df['swaps_tokenOut_id'].replace(usdc_addr, 'USDC')
swaps_df['swaps_tokenOut_id'] = swaps_df['swaps_tokenOut_id'].replace(weth_addr, 'WETH')

In [49]:
print(f'query returned {len(swaps_df)} rows\n swaps_df columns are {swaps_df.columns}')

query returned 100000 rows
 swaps_df columns are Index(['swaps_id', 'swaps_hash', 'swaps_logIndex', 'swaps_protocol_id',
       'swaps_to', 'swaps_from', 'swaps_blockNumber', 'swaps_timestamp',
       'swaps_tokenIn_id', 'swaps_amountIn', 'swaps_amountInUSD',
       'swaps_tokenOut_id', 'swaps_amountOut', 'swaps_amountOutUSD',
       'swaps_pool_id', 'endpoint'],
      dtype='object')


In [50]:
# convert swaps_df to pl
swaps_dict = swaps_df.to_dict('records')
swaps_pl = pl.from_dicts(swaps_dict)

In [52]:
cow_complete_pl.columns

['trades_id',
 'trades_timestamp',
 'trades_gasPrice',
 'trades_feeAmount',
 'trades_txHash',
 'trades_settlement_id',
 'trades_buyAmount',
 'trades_sellAmount',
 'trades_sellToken_id',
 'trades_buyToken_id',
 'trades_order_id',
 'trades_buyAmountEth',
 'trades_sellAmountEth',
 'trades_buyAmountUsd',
 'trades_sellAmountUsd',
 'endpoint',
 'settlements_id',
 'settlements_firstTradeTimestamp',
 'settlements_solver_id',
 'environment',
 'name',
 'active']

### Merge Cow and Univ3 (is this neccessary or is it better to keep the dataframes separate?)

In [67]:
# merge trades and swaps on timestamp value. We use outer join because we want to keep all trades and swaps data and backfill swap values
cow_uni_outer_pl = cow_complete_pl.join(other=swaps_pl, left_on='trades_timestamp', right_on='swaps_timestamp', how='outer')

In [68]:
# not sure if the merge between uni and cow is completely neccessary at this point
cow_uni_trunc_pl = cow_uni_outer_pl[[
    'trades_timestamp', 
    'trades_sellToken_id', 
    'trades_buyToken_id', 
    'trades_sellAmountUsd', 
    'trades_buyAmountUsd', 
    'name',
    'swaps_pool_id', 
    'swaps_tokenIn_id', 
    'swaps_tokenOut_id',  
    'swaps_amountInUSD',
    'swaps_amountOutUSD',
    'swaps_blockNumber'
    ]]

In [69]:
#check pl dataframe size
cow_uni_outer_pl.shape

(101104, 37)

In [70]:
# get the rows that have 0 null values. This leaves us with the cow/univ3 trades that were executed on the same timestamp.
cow_uni_trunc_no_nulls_pl = cow_uni_trunc_pl.drop_nulls()

In [71]:
#check pl dataframe size after dropping nulls
cow_uni_trunc_no_nulls_pl.shape

(664, 12)

In [72]:
cow_uni_trunc_no_nulls_pl.head(10)

trades_timestamp,trades_sellToken_id,trades_buyToken_id,trades_sellAmountUsd,trades_buyAmountUsd,name,swaps_pool_id,swaps_tokenIn_id,swaps_tokenOut_id,swaps_amountInUSD,swaps_amountOutUSD,swaps_blockNumber
i64,str,str,f64,f64,str,str,str,str,f64,f64,i64
1676578631,"""WETH""","""USDC""",682.437643,667.193664,"""Laertes""","""USDC_WETH .05%...","""WETH""","""USDC""",197797.02241,197157.273795,16643623
1676578631,"""WETH""","""USDC""",682.437643,667.193664,"""Laertes""","""USDC_WETH .05%...","""WETH""","""USDC""",42485.488674,42339.410681,16643623
1676577599,"""USDC""","""WETH""",325000.0,324807.393682,"""Otex""","""USDC_WETH .05%...","""WETH""","""USDC""",173978.803809,173873.399827,16643538
1676576375,"""WETH""","""USDC""",702.246701,680.956759,"""Otex""","""USDC_WETH .05%...","""WETH""","""USDC""",8365.833188,8354.112665,16643437
1676576027,"""WETH""","""USDC""",4200.48487,4181.709518,"""Otex""","""USDC_WETH .05%...","""USDC""","""WETH""",240000.0,239991.738835,16643408
1676576027,"""WETH""","""USDC""",4200.48487,4181.709518,"""Otex""","""USDC_WETH .05%...","""USDC""","""WETH""",494.63523,494.521645,16643408
1676572271,"""WETH""","""USDC""",549378.154525,547914.783158,"""PLM""","""USDC_WETH .05%...","""WETH""","""USDC""",11267.52172,11236.906127,16643095
1676572271,"""WETH""","""USDC""",549378.154525,547914.783158,"""PLM""","""USDC_WETH .05%...","""WETH""","""USDC""",2076.54522,2070.94122,16643095
1676567687,"""WETH""","""USDC""",343.058645,326.991679,"""Quasilabs""","""USDC_WETH .05%...","""WETH""","""USDC""",422.589689,422.472064,16642714
1676567687,"""WETH""","""USDC""",343.058645,326.991679,"""Quasilabs""","""USDC_WETH .05%...","""WETH""","""USDC""",2051.919521,2051.339422,16642714


### We analyze all of the cow/univ3 trades that occur on the same timestamp/block of the trading direction:
- COW - WETH(buy) -> USDC(sell)
- UNIV3 - USDC(tokenOut) -> WETH(tokenIn)

In [59]:
# get the rows where the trades_buyToken_id and swaps_tokenIn_id are both WETH
cow_uni_weth = cow_uni_trunc_no_nulls_pl.filter((pl.col("trades_buyToken_id") == "WETH") & (pl.col("swaps_tokenIn_id") == "WETH"))

In [60]:
diff_pl = cow_uni_weth.with_columns([
    (pl.col("trades_buyAmountUsd") - pl.col("trades_sellAmountUsd")).alias('cow_diff'),
    (pl.col("swaps_amountInUSD") - pl.col("swaps_amountOutUSD")).alias('uni_diff')
])

In [61]:
diff_pl = diff_pl.with_columns([
    (pl.col('cow_diff').abs()).alias('cow_diff'),
    (pl.col('uni_diff').abs()).alias('uni_diff')
])    

In [62]:
diff_pl = diff_pl.with_columns([
    (pl.col("cow_diff") / pl.col("trades_buyAmountUsd")).alias('cow_pct_diff'),
    (pl.col("uni_diff") / pl.col("swaps_amountInUSD")).alias('uni_pct_diff')
])

In [66]:
diff_pl.shape

(91, 16)

In [65]:
diff_pl.head(10)

trades_timestamp,trades_sellToken_id,trades_buyToken_id,trades_sellAmountUsd,trades_buyAmountUsd,name,swaps_pool_id,swaps_tokenIn_id,swaps_tokenOut_id,swaps_amountInUSD,swaps_amountOutUSD,swaps_blockNumber,cow_diff,uni_diff,cow_pct_diff,uni_pct_diff
i64,str,str,f64,f64,str,str,str,str,f64,f64,i64,f64,f64,f64,f64
1676577599,"""USDC""","""WETH""",325000.0,324807.393682,"""Otex""","""USDC_WETH .05%...","""WETH""","""USDC""",173978.803809,173873.399827,16643538,192.606318,105.403982,0.000593,0.000606
1676567015,"""USDC""","""WETH""",110000.0,110208.935716,"""PLM""","""USDC_WETH .05%...","""WETH""","""USDC""",6363.072497,6349.73263,16642658,208.935716,13.339867,0.001896,0.002096
1676567015,"""USDC""","""WETH""",110000.0,110208.935716,"""PLM""","""USDC_WETH .05%...","""WETH""","""USDC""",4562.271011,4551.963473,16642658,208.935716,10.307538,0.001896,0.002259
1676567015,"""USDC""","""WETH""",110000.0,110208.935716,"""PLM""","""USDC_WETH .05%...","""WETH""","""USDC""",5159.24797,5147.681646,16642658,208.935716,11.566324,0.001896,0.002242
1676563559,"""USDC""","""WETH""",17381.244451,17360.138855,"""PLM""","""USDC_WETH .05%...","""WETH""","""USDC""",69440.555418,69362.862292,16642371,21.105596,77.693126,0.001216,0.001119
1676563559,"""USDC""","""WETH""",17381.244451,17360.138855,"""PLM""","""USDC_WETH .05%...","""WETH""","""USDC""",222776.709715,222656.341637,16642371,21.105596,120.368078,0.001216,0.00054
1676563559,"""USDC""","""WETH""",17381.244451,17360.138855,"""PLM""","""USDC_WETH .05%...","""WETH""","""USDC""",10324.942584,10311.76104,16642371,21.105596,13.181544,0.001216,0.001277
1676562515,"""USDC""","""WETH""",171768.513385,171403.675813,"""Quasilabs""","""USDC_WETH .05%...","""WETH""","""USDC""",17140.367581,17152.556837,16642286,364.837572,12.189256,0.002129,0.000711
1676562515,"""USDC""","""WETH""",171768.513385,171403.675813,"""Quasilabs""","""USDC_WETH .05%...","""WETH""","""USDC""",1053.862124,1055.46451,16642286,364.837572,1.602386,0.002129,0.00152
1676562515,"""USDC""","""WETH""",171768.513385,171403.675813,"""Quasilabs""","""USDC_WETH .05%...","""WETH""","""USDC""",2571.055137,2574.945086,16642286,364.837572,3.889949,0.002129,0.001513


In [63]:
# plot polar dataframe with matplotlib
import plotly.express as px

In [64]:
px.scatter(diff_pl, x=list(diff_pl["cow_pct_diff"]), y=list(diff_pl["uni_pct_diff"]), title="Uni vs Cow WETH -> USDC Execution Price Percent Difference").update_layout(
    xaxis_title="cow_pct_diff", yaxis_title="uni_pct_diff"
)


The distutils package is deprecated and slated for removal in Python 3.12. Use setuptools or check PEP 632 for potential alternatives



### Write final output to csv

In [None]:
# save cow_complete_pl to csv
# cow_uni_pl.write_csv('data/cow_uni_pl.csv')