In [0]:
%pip install web3

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from web3 import Web3



# === SIMPLE PARAMETERIZATION (VARIABLES FOR REUSABILITY) ===
dbutils.widgets.text("catalog_name", "web3_data", "Catalog Name")
dbutils.widgets.text("schema_name", "ethereum_mainnet", "Schema Name")
dbutils.widgets.text("poll_interval", "15", "Polling Interval (seconds)")
dbutils.widgets.text("microbatch_size", "10", "Blocks per Microbatch")
dbutils.widgets.text("s3_managed_bucket","eth-stream-ingestion","S3 Managed Bucket")
dbutils.widgets.text("max_calls_per_second","0.8","Max calls/sec")

dbutils.widgets.text("output_catalog_name", "ethereum", "Output Catalog Name")
dbutils.widgets.text("output_schema_name", "bronze", "Output Schema Name")

# === CONFIGURATION ===
CATALOG = dbutils.widgets.get("catalog_name")
SCHEMA = dbutils.widgets.get("schema_name")
S3_MANAGED_BUCKET = dbutils.widgets.get('s3_managed_bucket')
POLL_INTERVAL = int(dbutils.widgets.get("poll_interval"))
MICROBATCH_SIZE = int(dbutils.widgets.get("microbatch_size"))
MAX_CALLS_PER_SECOND = dbutils.widgets.get("max_calls_per_second")

OUTPUT_CATALOG = dbutils.widgets.get("output_catalog_name")
OUTPUT_SCHEMA = dbutils.widgets.get("output_schema_name")


# Unity Catalog volume paths
DATA_VOLUME = f"/Volumes/{CATALOG}/{SCHEMA}/ethereum_blocks"
CHECKPOINT_VOLUME = f"/Volumes/{CATALOG}/{SCHEMA}/ethereum_checkpoints"
SCHEMA_VOLUME = f"/Volumes/{CATALOG}/{SCHEMA}/ethereum_schemas"
OUTPUT_VOLUME = f"/Volumes/{CATALOG}/{SCHEMA}/ethereum_output"

OUTPUT_DATA_VOLUME = f"/Volumes/{OUTPUT_CATALOG}/{OUTPUT_SCHEMA}/ethereum_blocks"


print(f"🔧 Using Catalog: {CATALOG}, Schema: {SCHEMA}")
print(f"⏱ Poll Interval: {POLL_INTERVAL}s")
print(f"📦 Microbatch Size: {MICROBATCH_SIZE} blocks per batch")
print(f"📁 Data: {DATA_VOLUME}")
print(f"📁 Checkpoints: {CHECKPOINT_VOLUME}")
print(f"📁 Schemas: {SCHEMA_VOLUME}")
print(f"🔧 Using Output Catalog: {OUTPUT_CATALOG}, Output Schema: {OUTPUT_SCHEMA}")


# === UNITY CATALOG SETUP ===
stmts = [
    f"CREATE CATALOG IF NOT EXISTS {CATALOG} MANAGED LOCATION 's3://{S3_MANAGED_BUCKET}/'",
    f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}",
    f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.ethereum_blocks",
    f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.ethereum_checkpoints",
    f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.ethereum_schemas",
    f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.ethereum_output",
    f"CREATE CATALOG IF NOT EXISTS {OUTPUT_CATALOG} MANAGED LOCATION 's3://{S3_MANAGED_BUCKET}/'",
    f"CREATE SCHEMA IF NOT EXISTS {OUTPUT_CATALOG}.{OUTPUT_SCHEMA}",
  
]

for i, s in enumerate(stmts, 1):
    print(f"[{i}/{len(stmts)}] {s}")
    try:
        spark.sql(s)
        print("  ✅ Success")
    except Exception as e:
        print(f"  ❌ Error: {e}")


🔧 Using Catalog: web3_data, Schema: ethereum_mainnet
⏱ Poll Interval: 15s
📦 Microbatch Size: 10 blocks per batch
📁 Data: /Volumes/web3_data/ethereum_mainnet/ethereum_blocks
📁 Checkpoints: /Volumes/web3_data/ethereum_mainnet/ethereum_checkpoints
📁 Schemas: /Volumes/web3_data/ethereum_mainnet/ethereum_schemas
🔧 Using Output Catalog: ethereum, Output Schema: bronze
[1/8] CREATE CATALOG IF NOT EXISTS web3_data MANAGED LOCATION 's3://eth-stream-ingestion/'
  ✅ Success
[2/8] CREATE SCHEMA IF NOT EXISTS web3_data.ethereum_mainnet
  ✅ Success
[3/8] CREATE VOLUME IF NOT EXISTS web3_data.ethereum_mainnet.ethereum_blocks
  ✅ Success
[4/8] CREATE VOLUME IF NOT EXISTS web3_data.ethereum_mainnet.ethereum_checkpoints
  ✅ Success
[5/8] CREATE VOLUME IF NOT EXISTS web3_data.ethereum_mainnet.ethereum_schemas
  ✅ Success
[6/8] CREATE VOLUME IF NOT EXISTS web3_data.ethereum_mainnet.ethereum_output
  ✅ Success
[7/8] CREATE CATALOG IF NOT EXISTS ethereum MANAGED LOCATION 's3://eth-stream-ingestion/'
  ✅ Suc

In [0]:
import os, json, time, asyncio, aiohttp, logging
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.sql import Row
from web3 import AsyncWeb3, AsyncHTTPProvider
import sys

# -----------------------------
# Logging setup
# -----------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    handlers=[
        logging.StreamHandler(sys.stdout)  # 👈 Send logs to notebook output
    ]
)
logger = logging.getLogger("EthereumStream")

# -----------------------------
# Partition class
# -----------------------------
class BlockRangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end
        logger.debug(f"Created partition: blocks {start} to {end}")

# -----------------------------
# Async Block Fetcher
# -----------------------------
async def fetch_block(aweb3, blk_num):
    """Fetch a single block asynchronously."""
    block = await aweb3.eth.get_block(blk_num, full_transactions=False)
    return {
        "block_number": block.number,
        "block_hash": block.hash.hex(),
        "timestamp": block.timestamp,
        "tx_count": len(block.transactions)
    }

async def fetch_blocks_concurrent(provider_uri, start, end, max_conc):
    """Fetch blocks in parallel with bounded concurrency."""
    async with aiohttp.ClientSession() as session:
        aweb3 = AsyncWeb3(AsyncHTTPProvider(provider_uri))  # remove session param
        sem = asyncio.Semaphore(max_conc)

        async def guarded(n):
            async with sem:
                return await fetch_block(aweb3, n)

        return await asyncio.gather(*(guarded(n) for n in range(start, end + 1)))

# -----------------------------
# StreamReader
# -----------------------------
class EthereumStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        logger.info("=" * 60)
        logger.info("Initializing EthereumStreamReader")
        logger.info("=" * 60)

        self.schema = schema
        self.options = options
        self.provider_uri = options.get("provider_uri")
        self.start_block = int(options.get("start_block", 1))
        self.output_dir = options.get("output_dir", "dbfs:/Volumes/web3_data/ethereum_mainnet/ethereum_blocks/raw")
        self.enable_local_write = options.get("enable_local_write", "false").lower() == "true"
        self.max_concurrency = int(options.get("max_concurrency", 8))
        self.current_block = self.start_block

        logger.info("Configuration:")
        logger.info(f"  - Provider URI: {self.provider_uri}")
        logger.info(f"  - Start block: {self.start_block}")
        logger.info(f"  - Output directory: {self.output_dir}")
        logger.info(f"  - Enable local write: {self.enable_local_write}")
        logger.info(f"  - Max concurrency: {self.max_concurrency}")
        logger.info("=" * 60)

    # -------------------------
    # Offsets
    # -------------------------
    def initialOffset(self):
        offset = {"offset": self.start_block}
        logger.info(f"initialOffset() -> {offset}")
        return offset

    def latestOffset(self):
        from web3 import Web3
        try:
            w3 = Web3(Web3.HTTPProvider(self.provider_uri))
            if not w3.is_connected():
                raise RuntimeError("Failed to connect to Ethereum provider")
            latest = w3.eth.block_number
            offset = {"offset": latest}
            logger.info(f"latestOffset() -> {offset}")
            return offset
        except Exception as e:
            logger.error(f"Error in latestOffset(): {e}", exc_info=True)
            raise

    # -------------------------
    # Partition Planning
    # -------------------------
    def partitions(self, start_offset, end_offset):
        """Legacy method name - calls planPartitions internally"""
        return self.planPartitions(start_offset, end_offset)
    
    def planPartitions(self, start_offset, end_offset):
        start = start_offset.get("offset", self.start_block) if start_offset else self.start_block
        end = end_offset.get("offset", start + 1000) if end_offset else start + 1000
        step = int(self.options.get("batch_size", 10))

        partitions = [
            BlockRangePartition(i, min(i + step - 1, end))
            for i in range(start, end + 1, step)
        ]

        logger.info(f"Planned {len(partitions)} partitions: {start}-{end}, batch={step}")
        return partitions

    # -------------------------
    # Read Partition (Async)
    # -------------------------
    def read(self, partition):
        logger.info(f"Reading partition: {partition.start}-{partition.end}")
        blocks = asyncio.run(fetch_blocks_concurrent(
            self.provider_uri, partition.start, partition.end, self.max_concurrency
        ))

        for blk in blocks:
            if self.enable_local_write:
                # Atomic write: temp → rename
                tmp_path = os.path.join(self.output_dir, f"block_{blk['block_number']}.json.tmp")
                final_path = os.path.join(self.output_dir, f"block_{blk['block_number']}.json")
                with open(tmp_path, "w") as f:
                    json.dump(blk, f, indent=2)
                os.replace(tmp_path, final_path)
                logger.debug(f"Wrote {final_path}")

            yield Row(**blk)
        logger.info(f"Completed reading partition {partition.start}-{partition.end}")

    # -------------------------
    # Commit Checkpoint
    # -------------------------
    def commit(self, end_offset):
        logger.info(f"Committing offset: {end_offset}")
        logger.info(f"✓ Committed up to block {end_offset.get('offset')}")

# -----------------------------
# DataSource Wrapper
# -----------------------------
class EthereumDataSource(DataSource):
    @classmethod
    def name(cls):
        return "ethereum"

    def schema(self):
        return StructType([
            StructField("block_number", LongType()),
            StructField("block_hash", StringType()),
            StructField("timestamp", LongType()),
            StructField("tx_count", LongType())
        ])

    def streamReader(self, schema):
        return EthereumStreamReader(schema, self.options)


In [0]:

#import uuid
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, ArrayType, MapType
from web3 import Web3


# Hard-coded constants
ETH_PROVIDER_URI = "https://mainnet.infura.io/v3/6504e6a7883c4b49ac1cf17099e2ea3a"

# Connect to an Ethereum node (Infura, Alchemy, etc.)
w3 = Web3(Web3.HTTPProvider(ETH_PROVIDER_URI))

# Get latest block number
start_block = w3.eth.block_number
print("Latest Ethereum block number:", start_block)

# --- Spark session ---
spark = SparkSession.builder.appName("EthereumStream").getOrCreate()

eth_schema = StructType([
    StructField("block_number", LongType(), True),
    StructField("block_hash", StringType(), True),
    StructField("timestamp", LongType(), True),
    StructField("tx_count", LongType(), True)
])


checkpoint_path = f"{CHECKPOINT_VOLUME}/"
output_path = f"{OUTPUT_VOLUME}/"
eth_provider_uri = f"{ETH_PROVIDER_URI}"
start_block = f"{start_block}"
poll_interval = f"{POLL_INTERVAL}"

print("✅ Using checkpoint:", checkpoint_path)
print("✅ Using output:", output_path)
print("✅ Using eth_provider_uri:", eth_provider_uri)
print("✅ Using start_block:", start_block)
print("✅ Using poll_interval:", poll_interval)

spark.dataSource.register(EthereumDataSource)

# --- Read from custom Ethereum source ---
df = spark.readStream \
    .format("ethereum") \
    .schema(eth_schema) \
    .option("provider_uri",eth_provider_uri) \
    .option("start_block", start_block) \
    .option("poll_interval", poll_interval) \
    .load()

# --- Write with dynamic paths ---
query = df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_path) \
    .trigger(availableNow=True) \
    .toTable("ethereum.bronze.blocks_json")

logger.info("Streaming query started. Awaiting termination...")



Latest Ethereum block number: 23536998
✅ Using checkpoint: /Volumes/web3_data/ethereum_mainnet/ethereum_checkpoints/
✅ Using output: /Volumes/web3_data/ethereum_mainnet/ethereum_output/
✅ Using eth_provider_uri: https://mainnet.infura.io/v3/6504e6a7883c4b49ac1cf17099e2ea3a
✅ Using start_block: 23536998
✅ Using poll_interval: 15


2025-10-09 01:47:56 [INFO] [EthereumStream] Streaming query started. Awaiting termination...


In [0]:
%sql SELECT * FROM ethereum.bronze.blocks_json


block_number,block_hash,timestamp,tx_count
23536861,42934da04481d8a272fac6ae90607e10e0c13a38f4f9cfb26c3d6aa4eaa97566,1759972811,253
23536862,f9606d689beaf5ad74846aad0fae7cab42e1481d82bc01472b752bb9f957bc05,1759972823,272
23536863,9e43663e0b26bf3a717d03b646dc24567df346f4940f04b697bb32ac3c137ee6,1759972835,180
23536864,9b5de85685ae90348cd8b23605440c1b348f72150ea3a1a5c42c37ed5b140376,1759972847,168
23536865,9200c239e0861a99a5c500c42bb37b88b4365c73bdd02ed7c3fa77ae1e37b4c0,1759972859,258
23536866,961a6247eb594688bfbce308ba8d7ed479252c3e69f4aba89bb52d165530d2c0,1759972871,175
23536867,790c56567910c798f533251f69d8b02763be763235e19eb66dc16c6481ee8e42,1759972883,175
23536868,01872c45f01342a3ed7fb078fd1b4792f6a6003bceebd12edaec6b5e783470a7,1759972895,171
23536869,b310cbd558ba51feaab23f004b97374e8489cbef5dbbe6ed97609abdac2e7294,1759972907,258
23536870,cbb29f9b4c4aafab8c2d5792631b2bb101ccdd3f6a27cef3359100423e2054ee,1759972919,146
