In [36]:
# Alchemy API Setup

from alchemy import Alchemy, Network
from web3 import Web3
from dotenv import load_dotenv, set_key
from pymongo import MongoClient
import os
import time
import json

# create Alchemy object using your Alchemy api key, default is "demo"
load_dotenv('../.env')
api_key = os.getenv('ALCHEMY_API_KEY')

# choose preferred network from Network, default is ETH_MAINNET
network = Network.ETH_MAINNET

# choose the maximum number of retries to perform, default is 5
max_retries = 3

# create Alchemy object
alchemy = Alchemy(api_key, network, max_retries=max_retries)

In [4]:
# Web3 API Setup
load_dotenv('../.env')
w3 = Web3(Web3.HTTPProvider(os.getenv('HTTPProvider')))

In [28]:
# Get block data from Alchemy
block_hash_alc = Alchemy.to_hex(alchemy.core.get_block('latest')['hash'])

# Get block data from Web3
block_data_w3 = w3.eth.get_block('latest')

print(f"Alchemy Block Hash: {block_hash_alc}")
print(f"Web3 Block Data: {block_data_w3}")
print("Web3 Block Data:")
for key, value in block_data_w3.items():
    if type(block_data_w3[key]) is list:
        for i in block_data_w3[key]:
            print(f"    {i}")
    else:
        print(key, value)

Alchemy Block Hash: 0x056074cc61544459cafad6c08f17078ddc2aa5a99a8748902eb16183350a2697
Web3 Block Data: AttributeDict({'baseFeePerGas': 1669403693, 'blobGasUsed': 0, 'difficulty': 0, 'excessBlobGas': 393216, 'extraData': HexBytes('0x6265617665726275696c642e6f7267'), 'gasLimit': 30000000, 'gasUsed': 14310994, 'hash': HexBytes('0x056074cc61544459cafad6c08f17078ddc2aa5a99a8748902eb16183350a2697'), 'logsBloom': HexBytes('0x83a129abe5ab01789a8db43786094c3d3665e087ad5599aa200d44980fe98e8d16df9d28a8395d12bac37b0117bc590f3691c9ab9ce3eb3566bb62496529210df1569d2d1197994f392661cdc8e161f30315607d907048397d468d2cd26746c6186252f952ac77ff8b09569d5b88fd953f63846b49d2fc94570133fc80d894b175e0b2f658d2db8e1e4fabc2abec41760897d5a1a375c15afe245b4f51b2ad888fe68e2fafa7aefe1bd133c05ce02d5bc42dad1878be1a689c2d4a6328baf4754d837c4ab2152bfb41492b0b4ab99aec65b00a93ee0c649c26a9623e48b7a58a677be828939804b99d2699c08ad111570528b37d93eb26d43a41db077ce99c2e'), 'miner': '0x95222290DD7278Aa3Ddd389Cc1E1d165CC4BAfe5', 'mixHa

In [34]:
# Get block data from Alchemy
block_hash_alc = Alchemy.to_hex(alchemy.core.get_block('latest')['hash'])
block_num_alc = Alchemy.to_hex(alchemy.core.get_block('latest')['number'])

# Get block data from Web3
block_hash_w3 = w3.eth.get_block('latest')['hash']
block_num_w3 = w3.eth.get_block('latest')['number']

# Cross reference block has between Alchemy query and Web3 query
print(block_hash_alc)
print(block_data_w3.hex())
print(int(block_num_alc,16))
print(block_num_w3)

# Gets all transaction receipts for a given block hash.
block_receipts_alc = alchemy.core.get_transaction_receipts(block_hash=block_hash_alc)
block_receipts_alc = w3.eth.get_block('latest')['number']

# Set a block number as the 'progenitor block' of the database.
set_key(dotenv_path='../.env', key_to_set="PROGENITOR_BLOCK", value_to_set=block_num_w3)

# Create function to listen for new block after progenitor block


# Create function to dig existing blocks starting from progenitor block
def export_latest_blocks(interval=60, blocks_to_export=5):
    while True:
        latest_block_number = alchemy.core.get_block_number()
        for i in range(latest_block_number):
            block = alchemy.core.get_block_with_transactions(latest_block_number - i)
            export_block_to_mongo(block)
        time.sleep(interval)

# Create function to send new blocks to mongodb
# Create funciton to send old blocks to mongodb

# Disconnection Functions
# https://chatgpt.com/share/8c7a65c4-b5da-4db5-b9c5-d7fa1ebbd28e
# Create function to detect shutdown of mongodb client
# Create function for storing last stable state before shutdown of mongodb client
# Create function for graceful shutdown

# Startup Functions
# Create function to check if mongodb client is running
# Create function to start from last stable state
# Create function to run send-to-mongodb functions if client is running


0x95435f66e3eb905ef1a0fd15fd699809adda098c6f53ca547d1ba6998f7bbf53
0xd5f87fb7af787f02014dd721e5e42b2b076aa32d249c50f798f48c5209d2d6ef
20526733
20526733


In [None]:
# 1st startup
# Latest block: 50
# F1 Gets transaction details of latest block: 50
# F1 sets object latest_block_current(50)
# On finish, F1 sets latest_block_finished(50) and sets latest_block_current(51)
# F1 waits for matching block

# F2 gets transaction details of latest block -1
# F2 sets object archive_block_current(49)
# On finish archive F2 sets object archive_block_finished(49) and sets archive_block_current(48)

In [None]:
# Initial setup
# Latest block: 50
# Set latest_block_current = 50
# Set archive_block_current = 49

# Function F1 (Latest block processor)
# F1 gets transaction details of latest_block_current
# F1 processes and stores data for latest_block_current
# On finish, F1 sets latest_block_finished = latest_block_current
# F1 increments latest_block_current
# F1 waits for new block if latest_block_current > chain height

# Function F2 (Archive block processor)
# F2 gets transaction details of archive_block_current
# F2 processes and stores data for archive_block_current
# On finish, F2 sets archive_block_finished = archive_block_current
# F2 decrements archive_block_current
# F2 stops if archive_block_current < 0 or reaches desired historical point

# Error handling and retry mechanism for both F1 and F2

# Periodic consistency check between latest_block_finished and archive_block_finished

# Logging and monitoring for both functions

In [None]:
# Get unique column names

unique_keys = set()

for receipt in block_receipts:
    unique_keys.update(receipt.keys())

print("Unique keys in block receipts: ")
for key in unique_keys:
    print(key)

In [None]:
# CSV Write Function

import csv
from datetime import datetime

# Define the CSV file name
csv_filename = f"transaction_receipts_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"

# Write to CSV
with open(csv_filename, mode='w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=columns)
    
    # Write the header
    writer.writeheader()
    
    # Write the data
    for receipt in block_receipts:
        row = {column: receipt.get(column, None) for column in columns}
        writer.writerow(row)

print(f"Transaction receipts exported to {csv_filename}")

In [None]:
# https://chatgpt.com/share/9d0b1717-9189-44c8-bc8e-62a1e96b4bf5
# 2.2.alchemy-sdk.py/web3.py (Jupyter_Notebook) > 2.5 MongoDB > 3.Kafka

# Set up MongoDB connection
client = MongoClient('mongodb://localhost:27017/')
db = client['blockchain']
collection = db['blocks']

# Function to export block data to MongoDB
def export_block_to_mongo(block):
    block_data = {
        'number': block['number'],
        'hash': block['hash'],
        'parentHash': block['parentHash'],
        'nonce': block['nonce'],
        'transactions': [
            {
                'hash': tx['hash'],
                'from': tx['from'],
                'to': tx.get('to'),
                'value': tx['value'],
                'gas': tx['gas'],
                'gasPrice': tx['gasPrice'],
                'input': tx['input'],
            } for tx in block['transactions']
        ]
    }
    collection.insert_one(block_data)
    print(f"Exported block {block['number']} to MongoDB")

# Main loop to fetch and export the latest blocks at intervals
def export_latest_blocks(interval=60, blocks_to_export=5):
    while True:
        latest_block_number = alchemy.core.get_block_number()
        for i in range(blocks_to_export):
            block = alchemy.core.get_block_with_transactions(latest_block_number - i)
            export_block_to_mongo(block)
        time.sleep(interval)

# Main guard
# if __name__ == "__main__":
#     export_latest_blocks(interval=60, blocks_to_export=5)

In [None]:
# https://chatgpt.com/share/7d318aa6-1b26-4350-a8f3-44db254c73b9

# 1. Structure Data for Kafka >

import json

block_data = json.dumps(latest_block, default=str)

# 2. Send Data to Kafka

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer.send('ethereum-blocks', block_data)
producer.flush()

# 3. Kafka to Spark Integration

# In terminal

# pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1

In [None]:
# https://chatgpt.com/share/beb9384f-36ae-4f4a-8802-84d1a9346bea

# 1. Create Kafka Topics
# In terminal
# kafka-topics.sh --create --topic ethereum-data --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

# 2. Send Data to Kafka

from confluent_kafka import Producer
import json

producer_config = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(producer_config)

def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

def produce_message(data):
    producer.produce('ethereum-data', key='key', value=json.dumps(data), callback=delivery_report)
    producer.flush()

# Example data from Ethereum-Alchemy pipeline
eth_data = {
    'block_number': 123456,
    'transaction_hash': '0xabc123...',
    'value': 10.5
}
produce_message(eth_data)

# 3. Install PySpark
# In Terminal
# pip install pyspark

# 4. Configure Spark Streaming with Kafka

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("EthereumKafkaSpark") \
    .getOrCreate()

# Define the schema for your Ethereum data
schema = StructType([
    StructField("block_number", StringType(), True),
    StructField("transaction_hash", StringType(), True),
    StructField("value", DoubleType(), True)
])

# Read stream from Kafka topic
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "ethereum-data") \
    .load()

# Decode the JSON data and apply schema
json_df = df.selectExpr("CAST(value AS STRING)") \
            .select(from_json(col("value"), schema).alias("data")) \
            .select("data.*")

# Process and display the streaming data
query = json_df.writeStream \
               .outputMode("append") \
               .format("console") \
               .start()

query.awaitTermination()

# 5. Run Spark Streaming Job