In [1]:
import ipyparallel as ipp
import os

In [2]:
# spin up four kernals
rc = ipp.Cluster(n=10).start_and_connect_sync()

# each engine will require a separately prefunded EOA. Note, funds must be on the Orbit L3
rc.wait_for_engines(n=10)
rc.ids

Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
100%|██████████| 10/10 [00:04<00:00,  2.03engine/s]


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [6]:
def executeTransaction(pk):
    from web3 import Web3, AsyncWeb3
    from web3.middleware import construct_sign_and_send_raw_middleware
    import time
    import json
    import random
    from retrying import retry

    # connect to the chain using the conduit RPC -- testnet so no need for access key
    w3 = Web3(Web3.HTTPProvider('https://nitrorpc-xps-l3-sepolia-arbitrum-anytru-yk5h2umn72.t.conduit.xyz'))

    # Instantiate an Account object from your key:
    send_acct = w3.eth.account.from_key(pk)

    # Add acct2 as auto-signer:
    w3.middleware_onion.add(construct_sign_and_send_raw_middleware(send_acct))

    # simple function to convert recipient address to 32 byte. Note, the recipient can be anything for testing (i.e., bob) but must be <32 bytes or it will fail
    def to_bytes32(input_string):
        # Encode the string to bytes
        encoded = input_string.encode()

        # Hash the encoded bytes using KECCAK-256 (referred to as SHA3 in Ethereum)
        return Web3.keccak(encoded)

    # contract call

    # deployed contract address
    contract_address = Web3.to_checksum_address('0x32B5ae10fEFA3a89BD4007EF44c2f1870fC3e0A8')

    # deployed contract abi
    abi = json.loads('''[
        {
        "anonymous": false,
        "inputs": [
            {
            "indexed": true,
            "internalType": "bytes32",
            "name": "recipient",
            "type": "bytes32"
            },
            {
            "indexed": false,
            "internalType": "string",
            "name": "message",
            "type": "string"
            }
        ],
        "name": "MessageSent",
        "type": "event"
        },
        {
        "inputs": [
            {
            "internalType": "bytes32",
            "name": "recipient",
            "type": "bytes32"
            },
            {
            "internalType": "string",
            "name": "message",
            "type": "string"
            }
        ],
        "name": "send",
        "outputs": [],
        "stateMutability": "payable",
        "type": "function"
        }
    ]''')

    # Reference the deployed contract:
    send_message = w3.eth.contract(address=contract_address, abi=abi)

    # Method arguments
    # recipient's wallet address
    recipient_address = to_bytes32('0xeBFc913dFfA597C1200b4c4daDe9D90415019CC2')

    # example IPFS CID
    string_arg = 'QmPK1s3pNYLi9ERiq3BDxKa4XosgWwFRQUydHUtz4YgpqB'

    # tx execution function
    # set retry logic
    @retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000, wait_exponential_max=10000)
    def send_raw_tx_with_retry(signed_tx):
        tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)

        return tx_hash

    tx_count = 1
    while tx_count <= 50000:
        if tx_count % 10 == 0:
            print(f"executing tx {tx_count}")
        # print(f"executing tx {tx_count}")
        # print(f"nonce is {w3.eth.get_transaction_count(acct2.address)}")
        # call the send function -- note the function will be renamed to sendMessage() soon
        # Manually build and sign a transaction:
        unsent_send_message_tx = send_message.functions.send(recipient_address,string_arg).build_transaction({
            'chainId': 7431004833520153,
            "from": send_acct.address,
            "nonce": w3.eth.get_transaction_count(send_acct.address),
        })
        signed_tx = w3.eth.account.sign_transaction(unsent_send_message_tx, private_key=send_acct.key)

        # Send the raw transaction:
        try:
            tx_hash = send_raw_tx_with_retry(signed_tx)
        except Exception as e:
            print(f"failed to send tx after retries {e}")

        # iterate
        tx_count = tx_count + 1
        
        # wait a second before sending the next tx
        random_integer = random.uniform(.99, 1.01)
        time.sleep(random_integer)

    return f"{send_acct.address} worker completed job"

In [7]:
rc[:].map_sync(executeTransaction, [os.environ.get('PRIVATE_KEY0'),os.environ.get('PRIVATE_KEY1'),
                                    os.environ.get('PRIVATE_KEY2'),os.environ.get('PRIVATE_KEY3'),
                                    os.environ.get('PRIVATE_KEY4'),os.environ.get('PRIVATE_KEY5'),
                                    os.environ.get('PRIVATE_KEY6'),os.environ.get('PRIVATE_KEY7'),
                                    os.environ.get('PRIVATE_KEY8'),os.environ.get('PRIVATE_KEY9')])

<AsyncMapResult(executeTransaction): pending>