In [None]:
%%capture
### (1) Install dependencies and restart runtime
import os

!pip install --upgrade jsonschema==4.4.0
!pip install --upgrade web3==5.26.0

os.kill(os.getpid(), 9)  # Hack to restart the runtime after install
# This will throw an error

In [None]:
### (2) Verify installation
from web3 import Web3

In [None]:
### (3) Define constants

# Infura free plan API key. Don't abuse this please!
NODE_ENDPOINT = "https://mainnet.infura.io/v3/SECRET"

In [None]:
### (4) Authenticate with Google Cloud Platform to run BigQuery queries
from google.colab import auth
auth.authenticate_user()

# DESCRIPTION

A data pipeline for enriching and summarizing ethereum token transfer data one day at a time.

1. Summatizing data from the public BigQuery table `bigquery-public-data.crypto_ethereum.token_transfers`.
  - Count the number of transactions for each `token_address` and save top 100
1. Enriching the data with information from the blockchain using the provided Infura node endpoint
  - Append token `name` and `symbol`
1. Processing one day at a time
1. Each execution of the pipeline triggered by running the Python function `run_batch_job` (see below)
1. Storing the result of each run in a BigQuery table 


| event_date | token_name | token_symbol | token_address | number_of_transfers |
|---|---|---|---|---|
| 2022-01-01 | USD Coin | USDC | 0x123... | 1000 | 
| 2022-01-01 | Aave | AAVE | 0x456... | 800 | 
| 2022-01-01 | Wrapped Ether | WETH | 0x789... | 600 | 

Extra:
 - The pipeline uses as few Infura lookups as possible
 - You may not use state in memory between pipeline runs (variables in Python)

for dates 2022-01-01 -> 2022-01-03 (three days)

In [None]:
from google.cloud import bigquery

PROJECT = 'PROJECT_NAME'
TARGET_DATASET = 'token_dataset'
TARGET_TABLE = 'token_transfer'
TARGET_GLOSSARY = 'token_glossary'

BQ = bigquery.Client(project=PROJECT)

w3 = Web3(Web3.HTTPProvider(NODE_ENDPOINT))
ABI = [
  # One list entry for each contract function
  {
    "constant": True,
    "inputs": [],
    "name": "name",
    "outputs": [
      {
        "name": "name",
        "type": "string"
      },
    ],
    "type": "function"
  },
    {
    "constant": True,
    "inputs": [],
    "name": "symbol",
    "outputs": [
      {
        "symbol": "symbol",
        "type": "string"
      },
    ],
    "type": "function"
  }
]

schema = [
    bigquery.SchemaField("token_address", "string", mode="REQUIRED"),
    bigquery.SchemaField("token_name", "string", mode="REQUIRED"),
    bigquery.SchemaField("token_symbol", "string", mode="REQUIRED"),
]

table_glossary = bigquery.Table(f"{PROJECT}.{TARGET_DATASET}.{TARGET_GLOSSARY}", schema=schema)
#TODO check if table doesn't exist
table_create_glossary = BQ.create_table(table_glossary)


In [None]:
schema = [
    bigquery.SchemaField("event_date", "DATE"),
    bigquery.SchemaField("token_address", "STRING"),
    bigquery.SchemaField("number_of_transfers", "INTEGER"),
    bigquery.SchemaField("token_name", "STRING"),
    bigquery.SchemaField("token_symbol", "STRING"),]

#TODO check if table doesn't exist
table = bigquery.Table(f"{PROJECT}.{TARGET_DATASET}.{TARGET_TABLE}", schema=schema)
table_create = BQ.create_table(table) 

In [None]:
from google.cloud import bigquery
import pandas as pd

LIMIT_NUMBER = 10

def enrich_token_data(token_address: str) -> str:
  contract = w3.eth.contract(w3.toChecksumAddress(token_address), abi=ABI)
  #TODO check data type match
  try:
    name = contract.functions.name().call()
    symbol = contract.functions.symbol().call()
  except:
    print("Errors with w3.eth.contract")
  return name, symbol

def run_batch_job(date: str):
  #TODO implement check if user has upgraded tier account. This one is for upgraded
  eth_df = BQ.query(f"""select CAST('{date}' AS DATE) as event_date, token_address, CAST(count(*) AS INT) as number_of_transfers 
                        from bigquery-public-data.crypto_ethereum.token_transfers 
                        where date(block_timestamp) = '{date}' group by token_address 
                        order by number_of_transfers desc limit {LIMIT_NUMBER};""").to_dataframe()
  token_glossary = BQ.query(f"""select * from {PROJECT}.{TARGET_DATASET}.token_glossary""").to_dataframe()

  get_target_token_table = BQ.get_table(f"{PROJECT}.{TARGET_DATASET}.{TARGET_TABLE}")
  selected_fields = get_target_token_table.schema[:3]
  target_token_rows = BQ.list_rows(get_target_token_table, selected_fields=selected_fields)
  existing_rows = [row.values() for row in target_token_rows]

  # Filter the rows to insert
  new_rows = [tuple(row) for row in eth_df.values if tuple(row) not in existing_rows]

  # Insert the new rows into the table
  if new_rows:
    for i, r in enumerate(new_rows):
      # Checking if data in glossary table exists
      if r[1] in token_glossary.token_address.values:
        print(f"Token address in Glossary table found")
        name = token_glossary.loc[token_glossary.token_address == r[1], 'token_name'].values[0]
        symbol = token_glossary.loc[token_glossary.token_address == r[1], 'token_symbol'].values[0]

      else:
        print(f"Token address in glossary table is not found")
        name, symbol = enrich_token_data(r[1])
        new_glossary_rows = tuple((r[1], name, symbol))
        result = BQ.insert_rows(table_glossary, [new_glossary_rows])
        print(f"New token info for -- {name} -- inserted in the glossary")

      row_values = list(r)
      row_values.append(name)  
      row_values.append(symbol)
      new_rows[i] = tuple(row_values)

    result = BQ.insert_rows(table, new_rows)
    return print(f"{TARGET_TABLE} successfully updated")
  else:
    print('No new rows to insert')


DATE = '2022-01-01'
run_batch_job(DATE)
# run_batch_job(run_date_2)
# run_batch_job(run_date_3)

Token address in glossary table is not found
New token info for -- GoldHunter -- inserted in the glossary
Token address in glossary table is not found
New token info for -- Phanta Bear -- inserted in the glossary
Token address in glossary table is not found
New token info for -- SHIBA INU -- inserted in the glossary
Token address in glossary table is not found
New token info for -- Matic Token -- inserted in the glossary
Token address in glossary table is not found
New token info for -- SOS -- inserted in the glossary
token_transfer successfully updated
