In [1]:
import os
from tqdm.notebook import tqdm
import polars as pl
import json


In [2]:
# Existing dataset dir
data_dir = './data/'

# Existing plots dir
plots_dir = data_dir+'/plots/'
os.makedirs(data_dir, exist_ok=True)
os.makedirs(plots_dir, exist_ok=True)



In [3]:
parquet_file_path = './data/usd-eth.parquet'
prices_df = pl.scan_parquet(parquet_file_path).collect(streaming=True)

prices_df = prices_df.rename({'price': 'CEX_price'})

print(prices_df)

shape: (11_664_000, 2)
┌─────────────────────┬───────────┐
│ timestamp           ┆ CEX_price │
│ ---                 ┆ ---       │
│ datetime[ms]        ┆ f64       │
╞═════════════════════╪═══════════╡
│ 2023-12-31 00:00:00 ┆ 2291.17   │
│ 2023-12-31 00:00:01 ┆ 2291.78   │
│ 2023-12-31 00:00:02 ┆ 2291.78   │
│ 2023-12-31 00:00:03 ┆ 2291.78   │
│ 2023-12-31 00:00:04 ┆ 2291.78   │
│ …                   ┆ …         │
│ 2024-05-13 23:59:55 ┆ 2950.56   │
│ 2024-05-13 23:59:56 ┆ 2950.56   │
│ 2024-05-13 23:59:57 ┆ 2950.56   │
│ 2024-05-13 23:59:58 ┆ 2950.56   │
│ 2024-05-13 23:59:59 ┆ 2950.56   │
└─────────────────────┴───────────┘


In [4]:
# Function to convert hexadecimal string to signed integer
def hex_to_int(hex_str, num_bits):
    # Convert hexadecimal string to integer
    value = int(hex_str, 16)
    # Check if the value is negative
    if value & (1 << (num_bits - 1)):
        # Convert negative value to signed integer
        return value - (1 << num_bits)
    else:
        return value

# Function to convert hexadecimal string to unsigned integer
def hex_to_uint(hex_str):
    return int(hex_str, 16)


def process_swaps(file_name, prices_df, token0 = 'EUR', fee = 0.0005):

    AMOUNT_USD = 'amount1'
    AMOUNT_ETH = 'amount0'
    RESERVE_USD = 'reserve_1'
    RESERVE_ETH = 'reserve_0'
    CONST_Token0 = 1e18 #WETH 
    CONST_Token1 = 1e6 #USDC
    CONST_Token0_Token1 = CONST_Token0 / CONST_Token1

    if (token0 == 'USDC') :
        AMOUNT_USD = 'amount0'
        AMOUNT_ETH = 'amount1'
        RESERVE_USD = 'reserve_0'
        RESERVE_ETH = 'reserve_1'
        CONST_Token1 = 1e18 #WETH 
        CONST_Token0 = 1e6 #USDC
        CONST_Token0_Token1 = CONST_Token0 / CONST_Token1


    # Load the parquet file
    swaps_df = pl.scan_parquet(file_name).collect(streaming=True)

    # Convert hexadecimal strings to integers with appropriate data types
    swaps_df = swaps_df.with_columns(
        amount0=swaps_df['data'].str.slice(2, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token0),
        amount1=swaps_df['data'].str.slice(66, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token1),
        sqrtPriceX96=swaps_df['data'].str.slice(130, 64).apply(hex_to_uint),
        tick=swaps_df['data'].str.slice(258).apply(lambda x: hex_to_int(x, 256))
    )

    swaps_df = swaps_df.with_columns(
            liquidity_hex=swaps_df['data'].str.slice(194, 64))

    swaps_df = swaps_df.with_columns(
        pl.col("liquidity_hex")
        .map_elements(lambda x: hex_to_uint(x), return_dtype=pl.Float64)
        .alias("liquidity"),
    )

    #these are virtual reserves in the tick

    CONST_Liquidity = (CONST_Token0 * CONST_Token1)**0.5

    swaps_df = swaps_df.with_columns((  abs( pl.col(AMOUNT_USD) / pl.col(AMOUNT_ETH) ) ).alias('effective_price'))
    swaps_df = swaps_df.with_columns(( (pl.col('sqrtPriceX96').mul(1/2**96) ) **2 * CONST_Token0_Token1 ).alias('spot_price'))

    swaps_df = swaps_df.with_columns(( (pl.col('liquidity').mul(1/CONST_Liquidity) ).mul(1/pl.col('spot_price')**0.5) ).alias('reserve_0'))
    swaps_df = swaps_df.with_columns(( (pl.col('liquidity').mul(1/CONST_Liquidity) ).mul(pl.col('spot_price')**0.5) ).alias('reserve_1'))

    if (token0 == 'USDC') :
        swaps_df = swaps_df.with_columns(
            spot_price = 1/pl.col('spot_price')
        )

    swaps_df = swaps_df.drop("topics", "data", "liquidity_hex")

    # Merging swap data with market prices
    swaps_df = swaps_df.with_columns(pl.col("block_timestamp").dt.cast_time_unit('us'))
    prices_df = prices_df.with_columns(pl.col("timestamp").dt.cast_time_unit('us'))
    swaps_df = swaps_df.join(prices_df, left_on = "block_timestamp", right_on = "timestamp", how = 'left')

    swaps_df = swaps_df.rename(
        {AMOUNT_USD: "amount_USD",
        AMOUNT_ETH: "amount_ETH",
        RESERVE_USD: "reserve_USD",
        RESERVE_ETH: "reserve_ETH"}
    )

    #process gas fees
    swaps_df = swaps_df.with_columns(
        gas_fee_gwei = pl.col('receipt_gas_used').mul(pl.col('gas_price')/1e9)
    )
    swaps_df = swaps_df.with_columns(
        gas_fee_USD = pl.col('gas_fee_gwei').mul(pl.col('CEX_price')/1e9)
    )
    swaps_df = swaps_df.drop("gas", "gas", "receipt_gas_used", "receipt_cumulative_gas_used")

    #process LP fee
    swaps_df = swaps_df.with_columns(
        LP_fee_USD = pl.col('amount_USD').abs() * fee
    )

    #process price impact and slippage
    swaps_df = swaps_df.with_columns(
        total_slippage_USD = pl.col('amount_USD').abs() - (pl.col('spot_price').shift(-1)).mul(pl.col('amount_ETH')).abs(),
    )

    #process total feel
    swaps_df = swaps_df.with_columns(
        total_fee_USD = pl.col('gas_fee_USD') + pl.col('LP_fee_USD')  + pl.col('total_slippage_USD')
    )


    #calculate price impact
    swaps_df = swaps_df.with_columns(
        price_impact = pl.col('amount_USD').mul(1/pl.col('reserve_USD')).abs()
    )
    swaps_df = swaps_df.with_columns(pl.col('amount_USD').abs().alias('volume_USD'))

    #block slipage
    swaps_df = swaps_df.with_columns(pl.lit(0).alias('block_slippage_USD'))
    swaps_df = swaps_df.sort(['block_number', 'transaction_index'])
    swaps_df = swaps_df.with_columns(
        pl.col('amount_USD').cumsum().over('block_number').alias('block_slippage_USD')
    )
    swaps_df = swaps_df.with_columns(
        (pl.col('block_slippage_USD') - pl.col('amount_USD')).alias('block_slippage_USD')
    )
    swaps_df = swaps_df.with_columns(
        block_impact = pl.col('amount_USD').abs().mul(1/(pl.col('reserve_USD')-pl.col('block_slippage_USD'))) - pl.col('price_impact')
    )
    swaps_df = swaps_df.with_columns(
        total_fees = (pl.col('effective_price') - pl.col('spot_price')).mul(1/pl.col('amount_USD'))
    )




    return swaps_df


In [5]:
file_name = "./data/raw-data-arbitrum-Uniswap-v3-WETH-USDC.paraquet"
parquet_file_path = "./data/swaps-arbitrum-Uniswap-v3-WETH-USDC.paraquet"

swaps_df = process_swaps(file_name, prices_df)
print(swaps_df)
swaps_df.write_parquet(parquet_file_path)

del swaps_df

  amount0=swaps_df['data'].str.slice(2, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token0),
  amount1=swaps_df['data'].str.slice(66, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token1),
  sqrtPriceX96=swaps_df['data'].str.slice(130, 64).apply(hex_to_uint),
  tick=swaps_df['data'].str.slice(258).apply(lambda x: hex_to_int(x, 256))
  pl.col('amount_USD').cumsum().over('block_number').alias('block_slippage_USD')


shape: (2_400_000, 26)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ transacti ┆ transacti ┆ log_index ┆ block_num ┆ … ┆ volume_US ┆ block_sli ┆ block_imp ┆ total_fe │
│ on_hash   ┆ on_index  ┆ ---       ┆ ber       ┆   ┆ D         ┆ ppage_USD ┆ act       ┆ es       │
│ ---       ┆ ---       ┆ i64       ┆ ---       ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---      │
│ str       ┆ i64       ┆           ┆ i64       ┆   ┆ f64       ┆ f64       ┆ f64       ┆ f64      │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 0xe74d49e ┆ 4         ┆ 13        ┆ 187373628 ┆ … ┆ 8633.4852 ┆ 0.0       ┆ 0.0       ┆ 0.000202 │
│ 88dfdb4e2 ┆           ┆           ┆           ┆   ┆ 51        ┆           ┆           ┆          │
│ a45b70635 ┆           ┆           ┆           ┆   ┆           ┆           ┆           ┆          │
│ 8168b…    ┆           ┆           ┆           ┆   ┆           ┆   

In [6]:
file_name = "./data/raw-data-arbitrum-Uniswap-v3-WETH-USDCe.paraquet"
parquet_file_path = "./data/swaps-arbitrum-Uniswap-v3-WETH-USDCe.paraquet"

swaps_df = process_swaps(file_name, prices_df)
print(swaps_df)
swaps_df.write_parquet(parquet_file_path)

del swaps_df

  amount0=swaps_df['data'].str.slice(2, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token0),
  amount1=swaps_df['data'].str.slice(66, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token1),
  sqrtPriceX96=swaps_df['data'].str.slice(130, 64).apply(hex_to_uint),
  tick=swaps_df['data'].str.slice(258).apply(lambda x: hex_to_int(x, 256))
  pl.col('amount_USD').cumsum().over('block_number').alias('block_slippage_USD')


shape: (2_258_469, 26)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ transacti ┆ transacti ┆ log_index ┆ block_num ┆ … ┆ volume_US ┆ block_sli ┆ block_imp ┆ total_fe │
│ on_hash   ┆ on_index  ┆ ---       ┆ ber       ┆   ┆ D         ┆ ppage_USD ┆ act       ┆ es       │
│ ---       ┆ ---       ┆ i64       ┆ ---       ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---      │
│ str       ┆ i64       ┆           ┆ i64       ┆   ┆ f64       ┆ f64       ┆ f64       ┆ f64      │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 0xf4cf0f5 ┆ 4         ┆ 14        ┆ 165788868 ┆ … ┆ 99.802524 ┆ 0.0       ┆ 0.0       ┆ 0.011425 │
│ 9d3ccef4a ┆           ┆           ┆           ┆   ┆           ┆           ┆           ┆          │
│ f054e2d47 ┆           ┆           ┆           ┆   ┆           ┆           ┆           ┆          │
│ 2092b…    ┆           ┆           ┆           ┆   ┆           ┆   

In [7]:
file_name = "./data/raw-data-base-Uniswap-v3-WETH-USDC.paraquet"
parquet_file_path = "./data/swaps-base-Uniswap-v3-WETH-USDC.paraquet"

swaps_df = process_swaps(file_name, prices_df)
print(swaps_df)
swaps_df.write_parquet(parquet_file_path)

del swaps_df

  amount0=swaps_df['data'].str.slice(2, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token0),
  amount1=swaps_df['data'].str.slice(66, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token1),
  sqrtPriceX96=swaps_df['data'].str.slice(130, 64).apply(hex_to_uint),
  tick=swaps_df['data'].str.slice(258).apply(lambda x: hex_to_int(x, 256))
  pl.col('amount_USD').cumsum().over('block_number').alias('block_slippage_USD')


shape: (1_687_530, 26)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ transacti ┆ transacti ┆ log_index ┆ block_num ┆ … ┆ volume_US ┆ block_sli ┆ block_imp ┆ total_fe │
│ on_hash   ┆ on_index  ┆ ---       ┆ ber       ┆   ┆ D         ┆ ppage_USD ┆ act       ┆ es       │
│ ---       ┆ ---       ┆ i64       ┆ ---       ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---      │
│ str       ┆ i64       ┆           ┆ i64       ┆   ┆ f64       ┆ f64       ┆ f64       ┆ f64      │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 0x9a3d769 ┆ 7         ┆ 40        ┆ 8638929   ┆ … ┆ 2.150611  ┆ 0.0       ┆ 0.0       ┆ 0.5294   │
│ c9149d6ae ┆           ┆           ┆           ┆   ┆           ┆           ┆           ┆          │
│ 7d6d35c72 ┆           ┆           ┆           ┆   ┆           ┆           ┆           ┆          │
│ b9fde…    ┆           ┆           ┆           ┆   ┆           ┆   

In [8]:
file_name = "./data/raw-data-ethereum-Uniswap-v3-WETH-USDC.paraquet"
parquet_file_path = "./data/swaps-ethereum-Uniswap-v3-WETH-USDC.paraquet"

swaps_df = process_swaps(file_name, prices_df, "USDC")
print(swaps_df)
swaps_df.write_parquet(parquet_file_path)

del swaps_df

  amount0=swaps_df['data'].str.slice(2, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token0),
  amount1=swaps_df['data'].str.slice(66, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token1),
  sqrtPriceX96=swaps_df['data'].str.slice(130, 64).apply(hex_to_uint),
  tick=swaps_df['data'].str.slice(258).apply(lambda x: hex_to_int(x, 256))
  pl.col('amount_USD').cumsum().over('block_number').alias('block_slippage_USD')


shape: (761_005, 26)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ transacti ┆ transacti ┆ log_index ┆ block_num ┆ … ┆ volume_US ┆ block_sli ┆ block_imp ┆ total_fe │
│ on_hash   ┆ on_index  ┆ ---       ┆ ber       ┆   ┆ D         ┆ ppage_USD ┆ act       ┆ es       │
│ ---       ┆ ---       ┆ i64       ┆ ---       ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---      │
│ str       ┆ i64       ┆           ┆ i64       ┆   ┆ f64       ┆ f64       ┆ f64       ┆ f64      │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 0xe746189 ┆ 92        ┆ 185       ┆ 18908896  ┆ … ┆ 700.0     ┆ 0.0       ┆ 0.0       ┆ 0.001628 │
│ 39ea1dc79 ┆           ┆           ┆           ┆   ┆           ┆           ┆           ┆          │
│ 7d57008f6 ┆           ┆           ┆           ┆   ┆           ┆           ┆           ┆          │
│ e82aa…    ┆           ┆           ┆           ┆   ┆           ┆     

In [9]:
file_name = "./data/raw-data-optimism-Uniswap-v3-WETH-USDC.paraquet"
parquet_file_path = "./data/swaps-optimism-Uniswap-v3-WETH-USDC.paraquet"

swaps_df = process_swaps(file_name, prices_df)
print(swaps_df)
swaps_df.write_parquet(parquet_file_path)

del swaps_df

  amount0=swaps_df['data'].str.slice(2, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token0),
  amount1=swaps_df['data'].str.slice(66, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token1),
  sqrtPriceX96=swaps_df['data'].str.slice(130, 64).apply(hex_to_uint),
  tick=swaps_df['data'].str.slice(258).apply(lambda x: hex_to_int(x, 256))
  pl.col('amount_USD').cumsum().over('block_number').alias('block_slippage_USD')


shape: (1_186_780, 26)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ transacti ┆ transacti ┆ log_index ┆ block_num ┆ … ┆ volume_US ┆ block_sli ┆ block_imp ┆ total_fe │
│ on_hash   ┆ on_index  ┆ ---       ┆ ber       ┆   ┆ D         ┆ ppage_USD ┆ act       ┆ es       │
│ ---       ┆ ---       ┆ i64       ┆ ---       ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---      │
│ str       ┆ i64       ┆           ┆ i64       ┆   ┆ f64       ┆ f64       ┆ f64       ┆ f64      │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 0x8acce4d ┆ 7         ┆ 149       ┆ 114234215 ┆ … ┆ 0.037804  ┆ 0.0       ┆ 0.0       ┆ 30.33947 │
│ 9a719a520 ┆           ┆           ┆           ┆   ┆           ┆           ┆           ┆ 7        │
│ a239dc813 ┆           ┆           ┆           ┆   ┆           ┆           ┆           ┆          │
│ 41003…    ┆           ┆           ┆           ┆   ┆           ┆   

In [10]:
file_name = "./data/raw-data-zkSync-Uniswap-v3-WETH-USDC.paraquet"
parquet_file_path = "./data/swaps-zkSync-Uniswap-v3-WETH-USDC.paraquet"

swaps_df = process_swaps(file_name, prices_df, "USDC", 0.0030)
print(swaps_df)
swaps_df.write_parquet(parquet_file_path)

del swaps_df

  amount0=swaps_df['data'].str.slice(2, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token0),
  amount1=swaps_df['data'].str.slice(66, 64).apply(lambda x: hex_to_int(x, 256) / CONST_Token1),
  sqrtPriceX96=swaps_df['data'].str.slice(130, 64).apply(hex_to_uint),
  tick=swaps_df['data'].str.slice(258).apply(lambda x: hex_to_int(x, 256))


shape: (46_417, 26)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ transacti ┆ transacti ┆ log_index ┆ block_num ┆ … ┆ volume_US ┆ block_sli ┆ block_imp ┆ total_fe │
│ on_hash   ┆ on_index  ┆ ---       ┆ ber       ┆   ┆ D         ┆ ppage_USD ┆ act       ┆ es       │
│ ---       ┆ ---       ┆ i64       ┆ ---       ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---      │
│ str       ┆ i64       ┆           ┆ i64       ┆   ┆ f64       ┆ f64       ┆ f64       ┆ f64      │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 0x7e2b2b7 ┆ 0         ┆ 5         ┆ 22909923  ┆ … ┆ 324.52611 ┆ 0.0       ┆ 0.0       ┆ 0.02025  │
│ 049e471c7 ┆           ┆           ┆           ┆   ┆ 8         ┆           ┆           ┆          │
│ d0999ac99 ┆           ┆           ┆           ┆   ┆           ┆           ┆           ┆          │
│ a777d…    ┆           ┆           ┆           ┆   ┆           ┆      

  pl.col('amount_USD').cumsum().over('block_number').alias('block_slippage_USD')


In [11]:
#file_name = "./data/raw-data-zkSync-Uniswap-v3-WETH-USDC.paraquet"
#parquet_file_path = "./data/swaps-zkSync-Uniswap-v3-WETH-USDC.paraquet"

#swaps_df = process_swaps(file_name, prices_df, "USDC")
#swaps_df