In [1]:
# setup spark for processing transactions
import findspark
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

findspark.init()
spark_conf = pyspark.SparkConf()\
    .setMaster('local[*]')\
    .set('spark.local.dir', '/mnt/scratch_a/spark,/mnt/scratch_b/spark,/mnt/scratch_c/spark')\
    .set('spark.driver.memory', '100g')\
    .setAppName('eth')

spark_context = SparkContext(conf=spark_conf)
spark_session = SparkSession(spark_context)

In [2]:
# load and aggregate some prices
from src.TimeSpan import TimeSpan
from src.PricesData import PricesData

prices = PricesData('data/prices_eth_5m.csv', TimeSpan.DAY)

In [4]:
from src.EthTransactions import EthTransactions, TimeSpan, DataFrameFormat

DATA_DIR = '/mnt/ssd_large/Ethereum/data'
TARGET_DIR = 'data/transactions'
BLOCKS = '0_11761677'

# load transdactions exported from ethereum-etl
eth_transactions = EthTransactions(spark_session, f'{DATA_DIR}/transactions_{BLOCKS}.csv', f'{DATA_DIR}/receipts_{BLOCKS}.csv', prices)

# process transactions per day
eth_transactions.top_transactions(500, TimeSpan.DAY, f'{TARGET_DIR}/daily_top_{BLOCKS}', DataFrameFormat.PARQUET)
eth_transactions.stats(TimeSpan.DAY, f'{TARGET_DIR}/daily_stats_{BLOCKS}', DataFrameFormat.PARQUET)

# process transactions per hour
eth_transactions.top_transactions(500, TimeSpan.HOUR, f'{TARGET_DIR}/hourly_top_{BLOCKS}', DataFrameFormat.PARQUET)
eth_transactions.stats(TimeSpan.HOUR, f'{TARGET_DIR}/hourly_stats_{BLOCKS}', DataFrameFormat.PARQUET)