In [7]:
import asyncio
import json
import websockets
from datetime import datetime
from web3 import Web3
from confluent_kafka import Producer, Consumer

# Define Infura project ID
infura_project_id = 'a30f6d61930e4435aecdd1b6815f2026'

# # Connect to the Infura HTTP endpoint
web3 = Web3(Web3.HTTPProvider(f'https://mainnet.infura.io/v3/{infura_project_id}'))

def read_config():
    # Reads the client configuration from client.properties
    # and returns it as a key-value map
    config = {}
    with open("client.properties") as fh:
        for line in fh:
            line = line.strip()
            if len(line) != 0 and line[0] != "#":
                parameter, value = line.strip().split('=', 1)
                config[parameter] = value.strip()
    return config

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

def produce(topic, config, transaction_data):
    # Creates a new producer instance
    producer = Producer(config)
    # Produces a message
    producer.produce(topic, key=transaction_data['transaction_hash'], value=json.dumps(transaction_data), callback=delivery_report)
    producer.flush()  # Ensure all messages are delivered

async def subscribe_new_blocks(config, topic):
    infura_project_id = 'a30f6d61930e4435aecdd1b6815f2026'
    infura_url = f'wss://mainnet.infura.io/ws/v3/{infura_project_id}'

    async with websockets.connect(infura_url) as websocket:
        subscription_request = json.dumps({
            "jsonrpc": "2.0",
            "method": "eth_subscribe",
            "params": ["newHeads"],
            "id": 1
        })

        await websocket.send(subscription_request)
        print("Subscribed to new block headers")

        while True:
            try:
                response = await websocket.recv()
                response_json = json.loads(response)

                if 'params' in response_json:
                    block_header = response_json['params']['result']
                    block_number = block_header['number']
                    block = web3.eth.get_block(block_number, full_transactions=True)
                    print(f"New block received: {block['number']}")
                    analyze_block(block, config, topic)

            except websockets.ConnectionClosed:
                print("Connection closed, attempting to reconnect")
                await asyncio.sleep(1)
                continue

def analyze_block(block, config, topic):
    transaction_count = len(block.transactions)
    total_value = sum(tx['value'] for tx in block.transactions)
    average_value = total_value / transaction_count if transaction_count > 0 else 0
    timestamp = datetime.fromtimestamp(block['timestamp']).strftime('%Y-%m-%d %H:%M:%S')

    for tx in block.transactions:
        receipt = web3.eth.get_transaction_receipt(tx['hash'])
        transaction_data = {
            'block_number': block['number'],
            'timestamp': timestamp,
            'transaction_hash': tx['hash'].hex(),
            'from': tx['from'],
            'to': tx['to'],
            'value': float(Web3.from_wei(tx['value'], 'ether')),
            'gas': tx['gas'],
            'gas_price': float(Web3.from_wei(tx['gasPrice'], 'gwei')),
            'nonce': tx['nonce'],
            'transaction_index': tx['transactionIndex'],
            'gas_used': receipt['gasUsed'],
            'cumulative_gas_used': receipt['cumulativeGasUsed'],
            'gas_limit': tx['gas'],
            'receipt_status': receipt['status'],
            'transaction_fee': float(Web3.from_wei(receipt['gasUsed'] * tx['gasPrice'], 'ether')),
            'effective_gas_price': float(Web3.from_wei(receipt['effectiveGasPrice'], 'gwei'))
        }

        print(f"Transaction Hash: {transaction_data['transaction_hash']}")
        print(f"From: {transaction_data['from']}")
        print(f"To: {transaction_data['to']}")
        print(f"Value: {transaction_data['value']} ETH")
        print(f"Gas: {transaction_data['gas']}")
        print(f"Gas Price: {transaction_data['gas_price']} GWEI")

        # Send transaction data to Kafka
        produce(topic, config, transaction_data)
        

In [2]:

config = read_config()
global web3
infura_project_id = 'a30f6d61930e4435aecdd1b6815f2026'
web3 = Web3(Web3.HTTPProvider(f'https://mainnet.infura.io/v3/{infura_project_id}'))

In [8]:
process = await subscribe_new_blocks(config)

Subscribed to new block headers
New block received: 20221860
Transaction Hash: 0xe1198c2da669e443c325b29d6acbfa9eb8dcab4bd02d1b841cb1367acc1621e0
From: 0xCE28dD4dF311cC2A81405262d1f6a0531f472eD5
To: 0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D
Value: 0.0 ETH
Gas: 195271
Gas Price: 7.991762669 GWEI
{'block_number': 20221860, 'timestamp': '2024-07-02 17:38:59', 'transaction_hash': '0xe1198c2da669e443c325b29d6acbfa9eb8dcab4bd02d1b841cb1367acc1621e0', 'from': '0xCE28dD4dF311cC2A81405262d1f6a0531f472eD5', 'to': '0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D', 'value': 0.0, 'gas': 195271, 'gas_price': 7.991762669, 'nonce': 1858, 'transaction_index': 0, 'input': HexBytes('0x38ed1739000000000000000000000000000000000000000000000000120a871cc002000000000000000000000000000000000000000000000000b209bbbbb40fe213a4b600000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000ce28dd4df311cc2a81405262d1f6a0531f472ed500000000000000000000000000000000000000000000000000000000668481fa0

CancelledError: 

In [10]:
tx = {'block_number': 20221860, 'timestamp': '2024-07-02 17:38:59', 'transaction_hash': '0xf30ecd7d4be0b7c655cee30ca2492adbd602e09c95483c4aad9ee2e2a8298b4a', 'from': '0x95222290DD7278Aa3Ddd389Cc1E1d165CC4BAfe5', 'to': '0x388C818CA8B9251b393131C08a736A67ccB19297', 'value': 0.06090040081799748, 'gas': 23100, 'gas_price': 5.491762669, 'nonce': 1141720, 'transaction_index': 93, 'input': "HexBytes('0x')", 'gas_used': 22111, 'cumulative_gas_used': 9277625, 'gas_limit': 23100, 'receipt_status': 1, 'transaction_fee': 0.000121428364374259, 'effective_gas_price': 5.491762669}



In [11]:
tx

{'block_number': 20221860,
 'timestamp': '2024-07-02 17:38:59',
 'transaction_hash': '0xf30ecd7d4be0b7c655cee30ca2492adbd602e09c95483c4aad9ee2e2a8298b4a',
 'from': '0x95222290DD7278Aa3Ddd389Cc1E1d165CC4BAfe5',
 'to': '0x388C818CA8B9251b393131C08a736A67ccB19297',
 'value': 0.06090040081799748,
 'gas': 23100,
 'gas_price': 5.491762669,
 'nonce': 1141720,
 'transaction_index': 93,
 'input': "HexBytes('0x')",
 'gas_used': 22111,
 'cumulative_gas_used': 9277625,
 'gas_limit': 23100,
 'receipt_status': 1,
 'transaction_fee': 0.000121428364374259,
 'effective_gas_price': 5.491762669}