In [1]:
import asyncio
import websockets
import json
import time

import sys
sys.path.append("./chainklik")
import pandas as pd
import requests
from web3 import Web3

import libs.common.utils as utils
import libs.common.payload as payload
import config.config as cfg

In [2]:
from abc import ABC, abstractmethod

class Timer(ABC):
    def __init__(self, start, end, frequency):
        self.start = start
        self.end = end
        self.frequency = frequency
        assert self.start <= self.end

    @abstractmethod
    def is_active(self, ctx):
        pass

    @abstractmethod
    def is_expired(self, ctx):
        pass
        
class BlockTimer(Timer):
    def __init__(self, start, end, frequency):
        super().__init__(start, end, frequency)

    def is_active(self, ctx):
        if ctx["block_time"] - self.start >= 0:
            self.start = ((ctx["block_time"] - self.start) // self.frequency + 1) * self.frequency + self.start
            return True
        else:
            return False

    def is_expired(self, ctx):
        if self.start >= self.end:
            return True
        return False

class TimerDecorator:
    def __init__(self, timer_attr):
        self.timer_attr = timer_attr

    def __call__(self, func):
        def wrapper(*args, **kwargs):
            timer = getattr(args[0], self.timer_attr)
            ctx = args[1]
            if not timer.is_expired(ctx) and timer.is_active(ctx):
                func(*args, **kwargs)
        return wrapper

In [3]:
class Node(ABC):
    def __init__(self, id: str, deps: dict, params: dict):
        self.id = id
        self.deps = deps
        self.params = params
        self.output = None
        self.active = False
        self.finalized = True

    @abstractmethod
    def run(self, ctx: dict, values: dict) -> None:
        pass

In [4]:
class DataNode(Node):
    def __init__(self, id: str, deps: dict, params: dict, timer: Timer = None):
        super().__init__(id, deps, params)
        self.timer = timer
    
    @TimerDecorator("timer")
    def run(self, ctx: dict, values: dict) -> None:
        self.output = values
        self.active = True
        self.finalized = True

In [5]:
class TxNode(Node):
    def __init__(self, id: str, deps: dict, params: dict):
        super().__init__(id, deps, params)
        self.wallet = params["wallet"]
        self.to = params["to"]
        self.amount = params["amount"]
        self.finality = params["finality"] if "finality" in params else 2
        self.gas = params["gas"] if "gas" in params else 21000
        self.output = None
    
    def run(self, ctx: dict, values: dict):
        if self.finalized: # a new or finalized txn
            latest_block = w3.eth.get_block("latest")
            base_fee_per_gas = latest_block.baseFeePerGas   # Base fee in the latest block (in wei)
            max_priority_fee_per_gas = w3.to_wei(1, 'gwei') # Priority fee to include the transaction in the block
            max_fee_per_gas = (5 * base_fee_per_gas) + max_priority_fee_per_gas # Maximum amount you’re willing to pay 
            
            transaction_params = {
                'from': self.wallet,
                'to': self.to,
                'value': w3.to_wei(self.amount, 'ether'),
                'nonce': w3.eth.get_transaction_count(self.wallet),
                'gas': self.gas, 
                'maxFeePerGas': max_fee_per_gas, # Maximum amount you’re willing to pay 
                'maxPriorityFeePerGas': max_priority_fee_per_gas, # Priority fee to include the transaction in the block
                'chainId': 11155111 # ChainId of Sepolia Testnet
            }

            print('New transaction.', self.id)
            
            transaction = w3.eth.account.sign_transaction(transaction_params, private_key)
            transaction_hash = w3.eth.send_raw_transaction(transaction.rawTransaction)
            transaction_receipt = w3.eth.wait_for_transaction_receipt(transaction_hash)
            
            if transaction_receipt.status:
                print('Transaction successful!', self.id)
                self.tx_block = transaction_receipt["blockNumber"]
                self.finalized = False
            else:
                print('Transaction failed.', self.id)
        elif ctx["block_time"] - self.tx_block >= self.finality:
            print('Transaction finalized.', self.id)
            self.finalized = True

In [6]:
# Connect to an Ethereum node
w3 = Web3(Web3.HTTPProvider(cfg.config["eth_sepolia_http_url"]))

# Set sender and recipient addresses
sender_address = '0x5bA4D4264Bf9A8C3aaF7e1fea6f83f50643A3Fd7'
recipient_address = '0xaf6667a2F847beeca6a6604126Dc28344518840b'

# Set private key for the sender's account. 
private_key = '1dab201501e8b882ca3413edcdfed263e4834cd8ea4c9586aca7fb699c51d681'

In [7]:
w3.eth.get_block("latest")["number"]

5015793

In [67]:
bt = BlockTimer(w3.eth.get_block("latest")["number"], w3.eth.get_block("latest")["number"]+10, 10)
data_node = DataNode("data", {}, {}, bt)

In [68]:
tx_node1 = TxNode("tx1",{"trigger":"data"},{"wallet":sender_address,"to":recipient_address,"amount":0.001})
tx_node2 = TxNode("tx2",{"trigger":"data"},{"wallet":sender_address,"to":recipient_address,"amount":0.002})
tx_node3 = TxNode("tx3",{"trigger":"tx1"},{"wallet":sender_address,"to":recipient_address,"amount":0.003})

In [69]:
node_layers = [
    ["data"],
    ["tx1", "tx2"],
    ["tx3"]
]

In [70]:
nodes = {
    "data": data_node, 
    "tx1": tx_node1,
    "tx2": tx_node2,
    "tx3": tx_node3
}

In [71]:
nodes["data"].__dict__

{'id': 'data',
 'deps': {},
 'params': {},
 'output': None,
 'active': False,
 'finalized': True,
 'timer': <__main__.BlockTimer at 0x11edd4fd0>}

In [72]:
def print_nodes_state(node_layers):
    for node_layer in node_layers:
        print([(nodes[key].active,nodes[key].finalized) for key in node_layer])

In [73]:
async def process_message(block_info):
    print(int(block_info["number"], 16))
    ctx = {
        "block_time" : int(block_info["number"], 16)
    }
    values = {}
    for key in nodes:
        if nodes[key].finalized:
            nodes[key].active = False

    print_nodes_state(node_layers)
    
    for node_layer in node_layers:
        for key in node_layer:
            print(key)
            if len(nodes[key].deps) == 0: # no deps
                nodes[key].active = True
            else:
                for dep in nodes[key].deps:
                    if nodes[nodes[key].deps[dep]].active and nodes[nodes[key].deps[dep]].finalized:
                        nodes[key].active = True
            if nodes[key].active:
                nodes[key].run(ctx, values)

    print_nodes_state(node_layers)

    for node_layer in node_layers:
        for key in node_layer:
            print(nodes[key].__dict__)

In [74]:
async def message():
    async with websockets.connect(cfg.config["eth_sepolia_ws_url"]) as ws:
        sub_newheads = { "id":1, "jsonrpc":"2.0", "method":"eth_subscribe", "params":["newHeads"] }
        await ws.send(json.dumps(sub_newheads))
        sub_res = await ws.recv()
        print(json.loads(sub_res))
        while True:
            message = await asyncio.wait_for(ws.recv(), timeout=60)
            block_info = json.loads(message)["params"]["result"]
            # start = time.time()
            await process_message(block_info)
            # end = time.time()
            # print(end-start)

In [None]:
import nest_asyncio
nest_asyncio.apply()

loop = asyncio.get_event_loop()
while True:
    loop.run_until_complete(message())

{'jsonrpc': '2.0', 'id': 1, 'result': '0xc75443fbdc2069339f8fc64f8ff83bc2'}
5015877
[(False, True)]
[(False, True), (False, True)]
[(False, True)]
data
tx1
New transaction. tx1
Transaction successful! tx1
tx2
New transaction. tx2
Transaction successful! tx2
tx3
[(True, True)]
[(True, False), (True, False)]
[(False, True)]
{'id': 'data', 'deps': {}, 'params': {}, 'output': {}, 'active': True, 'finalized': True, 'timer': <__main__.BlockTimer object at 0x11edd4fd0>}
{'id': 'tx1', 'deps': {'trigger': 'data'}, 'params': {'wallet': '0x5bA4D4264Bf9A8C3aaF7e1fea6f83f50643A3Fd7', 'to': '0xaf6667a2F847beeca6a6604126Dc28344518840b', 'amount': 0.001}, 'output': None, 'active': True, 'finalized': False, 'wallet': '0x5bA4D4264Bf9A8C3aaF7e1fea6f83f50643A3Fd7', 'to': '0xaf6667a2F847beeca6a6604126Dc28344518840b', 'amount': 0.001, 'finality': 2, 'gas': 21000, 'tx_block': 5015878}
{'id': 'tx2', 'deps': {'trigger': 'data'}, 'params': {'wallet': '0x5bA4D4264Bf9A8C3aaF7e1fea6f83f50643A3Fd7', 'to': '0xaf6667

In [9]:
balance_sender = w3.from_wei(w3.eth.get_balance(sender_address), 'ether')
balance_recipient = w3.from_wei(w3.eth.get_balance(recipient_address), 'ether')

print(f'The balance of { sender_address } is: { balance_sender } ETH')
print(f'The balance of { recipient_address } is: { balance_recipient } ETH')

The balance of 0x5bA4D4264Bf9A8C3aaF7e1fea6f83f50643A3Fd7 is: 0.483341897565598 ETH
The balance of 0xaf6667a2F847beeca6a6604126Dc28344518840b is: 0.016 ETH
