In [6]:
"""
format the blocks parquet into csv file.
run following command first after installing cryo:

cryo blocks -b 18251965:18473543 --rpc <YOUR_ALCHEMY_RPC_URL> -o ./data/blocks --requests-per-second 50
"""
import polars as pl
import os
import pandas as pd

data_path = "./data/blocks/ethereum__blocks__*.parquet"
data_path = os.path.expanduser(data_path)


def scan_df():
    return pl.scan_parquet(data_path)


timestamps_blocks_fees = (
    scan_df()
    .select(pl.col("timestamp", "block_number", "base_fee_per_gas"))
    .collect(streaming=True)
    .to_numpy()
)

df = pd.DataFrame(
    timestamps_blocks_fees, columns=["timestamp", "blockNumber", "baseFeePerGas"]
)
df.to_csv("./data/blocks/timestamp_blockNumber_baseFeePerGas.csv", index=False)

In [7]:
"""
extract timestamp and opens from csv file downloaded at https://www.binance.com/en/landing/data

put original data into raw/ and add the path to .gitignore to avoid the upload error.
"""
eth_usdc_1s_df = pl.read_csv(
    "./data/cex_price/raw/ETHUSDT-1s-2023-10.csv", has_header=False
).select(["column_1", "column_2"])
eth_usdc_1s_df.columns = ["timestamp", "price"]
eth_usdc_1s_df = eth_usdc_1s_df.with_columns(pl.col("timestamp") // 1000)
eth_usdc_1s_df.write_csv("./data/cex_price/ETHUSDT-1s-2023-10-opens.csv")

In [8]:
"""
filter onchain events and save into csv files
"""
from dotenv import load_dotenv
import web3
from datetime import datetime, timezone
from utils import *
from decimal import Decimal
import time

load_dotenv()

############################################################

DEX = "UNI_V2"
PAIR = "WETH_USDT"

w3 = web3.Web3(web3.Web3.HTTPProvider(os.getenv("ALCHEMY_URL")))
uni_v2_contract = w3.eth.contract(
    address=os.getenv(f"{DEX}_{PAIR}_ADDRESS"), abi=os.getenv("UNI_V2_ABI")
)

############################################################

# date range
(from_block_number, from_block_timestamp) = get_block_from_timestamp(
    w3, int(datetime(2023, 10, 1, tzinfo=timezone.utc).timestamp())
)
(to_block_number, to_block_timestamp) = get_block_from_timestamp(
    w3, int(datetime(2023, 11, 1, tzinfo=timezone.utc).timestamp())
)

############################################################

# fetch events and block number; if error occurs reduce the chunk size.
chunk_size = 1800  # blocks = 6 hours / 12 seconds ; should be less than 2000 due to alchemy's policy.
swaps = []
mints = []
burns = []
syncs = []
for block_number in range(from_block_number, to_block_number, chunk_size):
    time.sleep(1)

    # swaps
    swap_logs = uni_v2_contract.events.Swap().get_logs(
        fromBlock=block_number, toBlock=block_number + chunk_size - 1
    )
    swaps.extend(
        [
            {
                "blockNumber": swap_log["blockNumber"],
                "amount0In": Decimal(swap_log["args"]["amount0In"]),
                "amount1In": Decimal(swap_log["args"]["amount1In"]),
                "amount0Out": Decimal(swap_log["args"]["amount0Out"]),
                "amount1Out": Decimal(swap_log["args"]["amount1Out"]),
            }
            for swap_log in swap_logs
        ]
    )

    # mints
    mint_logs = uni_v2_contract.events.Mint().get_logs(
        fromBlock=block_number, toBlock=block_number + chunk_size - 1
    )
    mints.extend(
        [
            {
                "blockNumber": mint_log["blockNumber"],
                "amount0Minted": Decimal(mint_log["args"]["amount0"]),
                "amount1Minted": Decimal(mint_log["args"]["amount1"]),
            }
            for mint_log in mint_logs
        ]
    )

    # burns
    burn_logs = uni_v2_contract.events.Burn().get_logs(
        fromBlock=block_number, toBlock=block_number + chunk_size - 1
    )
    burns.extend(
        [
            {
                "blockNumber": burn_log["blockNumber"],
                "amount0Burnt": Decimal(burn_log["args"]["amount0"]),
                "amount1Burnt": Decimal(burn_log["args"]["amount1"]),
            }
            for burn_log in burn_logs
        ]
    )

    # syncs
    sync_logs = uni_v2_contract.events.Sync().get_logs(
        fromBlock=block_number, toBlock=block_number + chunk_size - 1
    )
    syncs.extend(
        [
            {
                "blockNumber": sync_log["blockNumber"],
                "reserve0": Decimal(sync_log["args"]["reserve0"]),
                "reserve1": Decimal(sync_log["args"]["reserve1"]),
            }
            for sync_log in sync_logs
        ]
    )

# record the initial reserves
initial_reserves = uni_v2_contract.events.Sync().get_logs(
    fromBlock=from_block_number - chunk_size + 1, toBlock=from_block_number
)[-1]["args"]
syncs.insert(
    0,
    {
        "blockNumber": from_block_number,
        "reserve0": Decimal(initial_reserves["reserve0"]),
        "reserve1": Decimal(initial_reserves["reserve1"]),
    },
)

############################################################

# convert to DataFrame
swaps_df = pl.DataFrame(swaps)
mints_df = pl.DataFrame(mints)
burns_df = pl.DataFrame(burns)
syncs_df = pl.DataFrame(syncs)

# save as csv files
swaps_df.write_csv(f"data/onchain_events/{DEX}_{PAIR}_swaps.csv")
mints_df.write_csv(f"data/onchain_events/{DEX}_{PAIR}_mints.csv")
burns_df.write_csv(f"data/onchain_events/{DEX}_{PAIR}_burns.csv")
syncs_df.write_csv(f"data/onchain_events/{DEX}_{PAIR}_syncs.csv")

print(f"Query for events on {DEX} {PAIR} is done!")

############################################################

Query for events on UNI_V2 WETH_USDT is done!
