# Fetching Ethereum transactions

This notebook will query the historical ethereum transactions from Google Big Query, transform it to a Dask DataFrame and save it to parquet format. 

Note: The queries are done month by month because when querying larger period, it hangs.

In [1]:
import os
import pandas as pd
from google.cloud import bigquery
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from pathlib import Path

## Configurations

You can choose the range you want to query and save to disk.

In [2]:

configs = {
    "RANGE": {
        "START_DATE": "2019-01",
        "END_DATE": "2021-03"
    },
    "PATHS": {
       "BIG_QUERY_CREDENTIALS_PATH": "../credentials/gcp_cred.json",
    }
}

## Dask client

In [3]:
# DEFAULT USAGE
cluster = LocalCluster()
client = Client(cluster)
# USAGE WITH REMOTE CLUSTER
# client = Client('127.0.0.1:46363')
# client.restart()
client

0,1
Client  Scheduler: tcp://localhost:46363  Dashboard: http://localhost:46365/status,Cluster  Workers: 5  Cores: 10  Memory: 40.00 GB


In [4]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "../credentials/gcp-cred.json"

In [5]:
big_query_client = bigquery.Client()

## Big query

In [6]:
def fetch_eth_blocks(client: Client, start_date: str, end_date: str):
    
    sql = f"""
    SELECT txs.block_timestamp, txs.block_number, txs.gas_price, txs.gas as gas
    FROM `bigquery-public-data.crypto_ethereum.transactions` AS txs
    WHERE DATE(txs.block_timestamp) >= DATE('{start_date}') and DATE(txs.block_timestamp) <= DATE('{end_date}')
    ORDER BY txs.block_timestamp
    """
    df = client.query(sql).to_dataframe()
    df = df.set_index("block_number")
    return df

In [7]:
starts = pd.date_range(configs["RANGE"]["START_DATE"], configs["RANGE"]["END_DATE"],
              freq='MS').strftime("%Y-%m-%d").tolist()

ends = pd.date_range(configs["RANGE"]["START_DATE"], configs["RANGE"]["END_DATE"],
              freq='M').strftime("%Y-%m-%d").tolist()

for i in range(len(ends)):
    print(f"Fetching from {starts[i]} to  {ends[i]} ...")
    df = fetch_eth_blocks(big_query_client, starts[i], ends[i])
    ddf = dd.from_pandas(df, npartitions=10)
    ddf.to_parquet(f'datasets/{starts[i]}_to_{ends[i]}_gas')

Fetching from 2020-08-01 to  2020-08-31 ...
Fetching from 2020-09-01 to  2020-09-30 ...
Fetching from 2020-10-01 to  2020-10-31 ...
Fetching from 2020-11-01 to  2020-11-30 ...
Fetching from 2020-12-01 to  2020-12-31 ...
Fetching from 2021-01-01 to  2021-01-31 ...
Fetching from 2021-02-01 to  2021-02-28 ...


In [8]:
datasets = []
for p in sorted(Path("datasets").iterdir()):
    datasets.append(dd.read_parquet(p))
ddf = dd.concat(datasets, axis=0)

## Save to parquet files

We choose 500 partitions here, you can change this value depending on your hardware

In [9]:

ddf = ddf.repartition(npartitions=500)
ddf.to_parquet(f'datasets/{configs["RANGE"]["START_DATE"]}_to_{configs["RANGE"]["END_DATE"]}_gas')