Skip to content

sfermion/ethereum-etl-airflow

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Ethereum ETL Airflow

Read this article: https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-how-we-built-dataset

Setting up Airflow DAGs using Google Cloud Composer

Create BigQuery Datasets

Create Google Cloud Storage bucket

Create Google Cloud Composer environment

Create environment here, https://console.cloud.google.com/composer, use Python version 3, In PYPI Packages tab add ethereum-etl==1.2.3.

Create variables in Airflow (Admin > Variables in the UI):

Variable Description
ethereum_output_bucket GCS bucket to store exported files
ethereum_provider_uri URI of Ethereum node
ethereum_destination_dataset_project_id Project ID of BigQuery datasets
notification_emails email for notifications

Check other variables in dags/variables.py.

Upload DAGs

> ./upload_dags.sh <airflow_bucket>

Miscellaneous

To upload CSVs to BigQuery:

> cd output
> gsutil -m rsync -r . gs://<your_bucket>/ethereumetl/export
  • Sign in to BigQuery https://bigquery.cloud.google.com/

  • Create a new dataset called ethereum_blockchain_raw and ethereum_blockchain

  • Load the files from the bucket to BigQuery:

> git clone https://github.com/medvedev1088/ethereum-etl-airflow.git
> cd ethereum-etl-airflow/dags/resources/stages
> bq --location=US load --replace --source_format=CSV --skip_leading_rows=1 ethereum_blockchain_raw.blocks gs://<your_bucket>/ethereumetl/export/blocks/*.csv ./raw/schemas/blocks.json
> bq --location=US load --replace --source_format=CSV --skip_leading_rows=1 ethereum_blockchain_raw.transactions gs://<your_bucket>/ethereumetl/export/transactions/*.csv ./raw/schemas/transactions.json
> bq --location=US load --replace --source_format=CSV --skip_leading_rows=1 ethereum_blockchain_raw.token_transfers gs://<your_bucket>/ethereumetl/export/token_transfers/*.csv ./raw/schemas/token_transfers.json
> bq --location=US load --replace --source_format=CSV --skip_leading_rows=1 ethereum_blockchain_raw.receipts gs://<your_bucket>/ethereumetl/export/receipts/*.csv ./raw/schemas/receipts.json
> bq --location=US load --replace --source_format=NEWLINE_DELIMITED_JSON ethereum_blockchain_raw.logs gs://<your_bucket>/ethereumetl/export/logs/*.json ./raw/schemas/logs.json
> bq --location=US load --replace --source_format=NEWLINE_DELIMITED_JSON ethereum_blockchain_raw.contracts gs://<your_bucket>/ethereumetl/export/contracts/*.json ./raw/schemas/contracts.json
> bq --location=US load --replace --source_format=CSV --skip_leading_rows=1 --allow_quoted_newlines ethereum_blockchain_raw.tokens gs://<your_bucket>/ethereumetl/export/tokens/*.csv ./raw/schemas/tokens.json

Note that NEWLINE_DELIMITED_JSON is used to support REPEATED mode for the columns with lists.

Enrich blocks:

> bq mk --table --description "$(cat ./enrich/descriptions/blocks.txt | tr '\n' ' ')" --time_partitioning_field timestamp ethereum_blockchain.blocks ./enrich/schemas/blocks.json
> bq --location=US query --destination_table ethereum_blockchain.blocks --use_legacy_sql=false "$(cat ./enrich/sqls/blocks.sql | tr '\n' ' ')"

Enrich transactions:

> bq mk --table --description "$(cat ./enrich/descriptions/transactions.txt | tr '\n' ' ')" --time_partitioning_field block_timestamp ethereum_blockchain.transactions ./enrich/schemas/transactions.json
> bq --location=US query --destination_table ethereum_blockchain.transactions --use_legacy_sql=false "$(cat ./enrich/sqls/transactions.sql | tr '\n' ' ')"

Enrich token_transfers:

> bq mk --table --description "$(cat ./enrich/descriptions/token_transfers.txt | tr '\n' ' ')" --time_partitioning_field block_timestamp ethereum_blockchain.token_transfers ./enrich/schemas/token_transfers.json
> bq --location=US query --destination_table ethereum_blockchain.token_transfers --use_legacy_sql=false "$(cat ./enrich/sqls/token_transfers.sql | tr '\n' ' ')"

Enrich logs:

> bq mk --table --description "$(cat ./enrich/descriptions/logs.txt | tr '\n' ' ')" --time_partitioning_field block_timestamp ethereum_blockchain.logs ./enrich/schemas/logs.json
> bq --location=US query --destination_table ethereum_blockchain.logs --use_legacy_sql=false "$(cat ./enrich/sqls/logs.sql | tr '\n' ' ')"

Enrich contracts:

> bq mk --table --description "$(cat ./enrich/descriptions/contracts.txt | tr '\n' ' ')" --time_partitioning_field block_timestamp ethereum_blockchain.contracts ./enrich/schemas/contracts.json
> bq --location=US query --destination_table ethereum_blockchain.contracts --use_legacy_sql=false "$(cat ./enrich/sqls/contracts.sql | tr '\n' ' ')"

Enrich tokens:

> bq mk --table --description "$(cat ./enrich/descriptions/tokens.txt | tr '\n' ' ')" ethereum_blockchain.tokens ./enrich/schemas/tokens.json
> bq --location=US query --destination_table ethereum_blockchain.tokens --use_legacy_sql=false "$(cat ./enrich/sqls/tokens.sql | tr '\n' ' ')"

About

Insight DE project: Adding additional feature to ethereum-etl-airflow. Implemented fallback mechanism in case export task failed.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Python 99.5%
  • Shell 0.5%