In [8]:
import pandas as pd
import numpy as np
import pathlib

# Load ERC-20 Token List File

In [9]:
erc20_tkns = pd.read_csv('data/tokens.csv', usecols=['address', 'symbol', 'name', 'decimals'])
erc20_tkns.loc[erc20_tkns.decimals.isnull(),'decimals'] = 18
erc20_tkns['decimals'] = erc20_tkns['decimals'].astype('int')
erc20_addrs = erc20_tkns['address'].to_numpy()

# Load ERC-20 data from Ethereum Log Files

In [10]:
def load_erc20_log(log_file, erc20_addrs=erc20_addrs):
     # Load Ethereum event log file
    log_cols = ['BLOCK_DATE', 'ADDRESS', 'TOPIC0','TOPIC1', 'TOPIC2', 'TOPIC3', 'DATA']
    log = pd.read_parquet(log_file, columns=log_cols)
    log.columns = log.columns.str.lower()
    # Select ERC-20 transfer log
    #   Step 1: ERC-20 address filtering based on ERC-20 token list provided above (address)
    #   Step 2. ERC-20 transfer event keccak_256 hash filtering  (topic0)
    #   Step 3. ERC-20 transfer source and destiniation null check (topic1 and topic2)
    erc20_tranfer_event = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
    erc20_df = log[log.address.isin(erc20_addrs)        & 
                   (log.topic0 == erc20_tranfer_event)  &
                   ~log.topic1.isnull()                 &
                   ~log.topic2.isnull()]
    if erc20_df.empty: return erc20_df
    # Compute transfer amount with token decimal points (data and topic3)
    erc20_df = erc20_df.merge(erc20_tkns[['address', 'decimals']], on='address', how='left')
    erc20_df['amount'] =  np.where(erc20_df.data == '0x', erc20_df.topic3, erc20_df.data)
    erc20_df.amount = erc20_df.amount.apply(int, base=16) / pow(10, erc20_df['decimals'])
    # Compute total transfer count and volume per date and address
    erc20_df = (erc20_df.groupby(['block_date', 'address'])
                        .aggregate({ 'amount' : ['count', 'sum'] })
                        .reset_index())
    erc20_df.columns = ['block_date', 'address', 'transfer_count', 'transfer_amount']
    erc20_df.transfer_amount = erc20_df.transfer_amount.astype('Float64')
    #    
    return erc20_df

# Process All Log Data

In [11]:
# Collect file path information on Ethereum event log files
log_files       = sorted(list(pathlib.Path('data').glob('*.parquet')))
log_file_size   = len(log_files)
log_file_first  = log_files[0]
log_file_rest   = log_files[1:]
# Load log data and compute monthly ERC-20 transfer count and volume per address
print(f'{(1):5}/{log_file_size} : reading {log_file_first} ...')
erc20_df        = load_erc20_log(log_file_first)
for ix, log_file in enumerate(log_file_rest):
    print(f'{(ix+2):5}/{log_file_size} : reading {log_file} ...')
    df          = load_erc20_log(log_file)
    if df.empty: continue
    erc20_df = erc20_df.merge(df, on=['block_date', 'address'], how='outer').fillna(0)
    erc20_df['transfer_count' ] = erc20_df['transfer_count_x' ] + erc20_df['transfer_count_y' ]
    erc20_df['transfer_amount'] = erc20_df['transfer_amount_x'] + erc20_df['transfer_amount_y']
    erc20_df = erc20_df[['block_date','address','transfer_count','transfer_amount']]
#
erc20_df['transfer_count']  = erc20_df['transfer_count'].astype('Int64')
erc20_df['block_date']      = pd.to_datetime(erc20_df.block_date)


    1/561 : reading data/part-00000-tid-3338437147332302119-5415983f-a704-4c49-97f7-48587e09bee7-85-1-c000.snappy.parquet ...
    2/561 : reading data/part-00001-tid-3338437147332302119-5415983f-a704-4c49-97f7-48587e09bee7-86-1-c000.snappy.parquet ...
    3/561 : reading data/part-00002-tid-3338437147332302119-5415983f-a704-4c49-97f7-48587e09bee7-87-1-c000.snappy.parquet ...
    4/561 : reading data/part-00003-tid-3338437147332302119-5415983f-a704-4c49-97f7-48587e09bee7-88-1-c000.snappy.parquet ...
    5/561 : reading data/part-00004-tid-3338437147332302119-5415983f-a704-4c49-97f7-48587e09bee7-89-1-c000.snappy.parquet ...
    6/561 : reading data/part-00005-tid-3338437147332302119-5415983f-a704-4c49-97f7-48587e09bee7-90-1-c000.snappy.parquet ...
    7/561 : reading data/part-00006-tid-3338437147332302119-5415983f-a704-4c49-97f7-48587e09bee7-91-1-c000.snappy.parquet ...
    8/561 : reading data/part-00007-tid-3338437147332302119-5415983f-a704-4c49-97f7-48587e09bee7-92-1-c000.snappy.parq

# Daily Max Transfer Count and Amount per Address

In [12]:
out_config = { 
    'count' : ['address', 'transfer_count' ],
    'amount': ['address', 'transfer_amount'],
    'date'  : { 'prefix' : 'daily',   'period' : 'D' },
    'week'  : { 'prefix' : 'weekly',  'period' : 'W' },
    'month' : { 'prefix' : 'monthly', 'period' : 'M' }
}
#
views       = ['date', 'week', 'month']
measures    = ['count', 'amount']
for view in views:
    if view in views[1:]:
        erc20_df['block_' + view] = erc20_df.block_date.dt.to_period(out_config[view]['period'])
    for measure in measures:
        cols        = ['block_' + view] + out_config[measure]        
        out_file    = 'output/' + out_config[view]['prefix'] + '_transfer_' + measure + '.csv'
        df_max      = erc20_df.loc[erc20_df.groupby('block_' + view)['transfer_' + measure].idxmax()]
        print(f'generate {out_file} ...')
        df_max[cols].to_csv(out_file, index=False)   

generate output/daily_transfer_count.csv ...
generate output/daily_transfer_amount.csv ...
generate output/weekly_transfer_count.csv ...
generate output/weekly_transfer_amount.csv ...
generate output/monthly_transfer_count.csv ...
generate output/monthly_transfer_amount.csv ...
