In [1]:
import pandas as pd
from ethtx import EthTx, EthTxConfig
from ethtx.models.decoded_model import DecodedTransaction
from typing import List

In [2]:
ethtx_config = EthTxConfig(
    mongo_connection_string="mongomock://localhost/ethtx",  ##MongoDB connection string,
    etherscan_api_key="VQXUYMB6T6I2E9SF67JPKP6YCYFW5P645A",  ##Etherscan API key,
    web3nodes={
        "mainnet": {
            "hook": "https://api.archivenode.io/58db800c-c787-41c1-96d5-ed35e126eb35",  # multiple nodes supported, separate them with comma
            "poa": False
        }
    },
    default_chain="mainnet",
    etherscan_urls={"mainnet": "https://api.etherscan.io/api", },
)

ethtx = EthTx.initialize(ethtx_config)
""" decoded_transaction: DecodedTransaction = ethtx.decoders.decode_transaction(
    '0x1594180567438ad258c98e263b412f55f62a23ebb5e9056089e9f11a513d4816')

print(decoded_transaction) """

" decoded_transaction: DecodedTransaction = ethtx.decoders.decode_transaction(\n    '0x1594180567438ad258c98e263b412f55f62a23ebb5e9056089e9f11a513d4816')\n\nprint(decoded_transaction) "

In [3]:
web3provider = ethtx.providers.web3provider

from ethtx.models.w3_model import W3Transaction, W3Block, W3Receipt, W3CallTree

# read raw transaction data directly from the node
w3transaction: W3Transaction = web3provider.get_transaction(
    '0x1594180567438ad258c98e263b412f55f62a23ebb5e9056089e9f11a513d4816')
w3block: W3Block = web3provider.get_block(w3transaction.blockNumber)
w3receipt: W3Receipt = web3provider.get_receipt(w3transaction.hash.hex())
w3calls: W3CallTree = web3provider.get_calls(w3transaction.hash.hex())


In [4]:
from ethtx.models.decoded_model import (
    DecodedTransfer,
    DecodedBalance,
    DecodedEvent, DecodedCall,
)
from ethtx.models.objects_model import Transaction, Event, Block, Call

# read the raw transaction from the node
transaction = Transaction.from_raw(
    w3transaction=w3transaction, w3receipt=w3receipt, w3calltree=w3calls
)

# get proxies used in the transaction
proxies = ethtx.decoders.get_proxies(transaction.root_call, "mainnet")

block: Block = Block.from_raw(
    w3block=web3provider.get_block(transaction.metadata.block_number),
    chain_id="mainnet",
)

# decode transaction components
abi_decoded_events: List[Event] = ethtx.decoders.abi_decoder.decode_events(
    transaction.events, block.metadata, transaction.metadata
)
abi_decoded_calls: DecodedCall = ethtx.decoders.abi_decoder.decode_calls(
    transaction.root_call, block.metadata, transaction.metadata, proxies
)
abi_decoded_transfers: List[
    DecodedTransfer
] = ethtx.decoders.abi_decoder.decode_transfers(abi_decoded_calls, abi_decoded_events)
abi_decoded_balances: List[DecodedBalance] = ethtx.decoders.abi_decoder.decode_balances(
    abi_decoded_transfers
)


In [5]:

# decode a single event
raw_event: Event = transaction.events[0]
abi_decoded_event: DecodedEvent = ethtx.decoders.abi_decoder.decode_event(
    raw_event, block.metadata, transaction.metadata
)

# decode a single call
raw_call: Call = transaction.root_call.subcalls[0]
abi_decoded_call: DecodedCall = ethtx.decoders.abi_decoder.decode_call(
    raw_call, block.metadata, transaction.metadata, proxies
)


In [6]:
from ethtx.models.decoded_model import DecodedTransactionMetadata

# semantically decode transaction components
decoded_metadata: DecodedTransactionMetadata = (
    ethtx.decoders.semantic_decoder.decode_metadata(
        block.metadata, transaction.metadata, "mainnet"
    )
)
decoded_events: List[DecodedEvent] = ethtx.decoders.semantic_decoder.decode_events(
    abi_decoded_events, decoded_metadata, proxies
)

decoded_calls: Call = ethtx.decoders.semantic_decoder.decode_calls(
    abi_decoded_calls, decoded_metadata, proxies
)
decoded_transfers: List[
    DecodedTransfer
] = ethtx.decoders.semantic_decoder.decode_transfers(
    abi_decoded_transfers, decoded_metadata
)
decoded_balances: List[
    DecodedBalance
] = ethtx.decoders.semantic_decoder.decode_balances(
    abi_decoded_balances, decoded_metadata
)


In [7]:

""" # semantically decode a single event
decoded_event: DecodedEvent = ethtx.decoders.semantic_decoder.decode_event(
    abi_decoded_events[0], decoded_metadata, proxies
)
# semantically decode a single call
decoded_call: Call = ethtx.decoders.semantic_decoder.decode_call(
    abi_decoded_calls.subcalls[0], decoded_metadata, proxies
) """


In [15]:
""" import sys
import json

sys.path.append("../src/")
from com.cryptobot.utils.logger import PrettyLogger

transaction_dict = transaction.dict()
abi_decoded_events_dict = [ev.dict() for ev in abi_decoded_events]
abi_decoded_calls_dict = decoded_calls.dict()
abi_decoded_transfers_dict = [t.dict() for t in abi_decoded_transfers]
abi_decoded_balances_dict = [b.dict() for b in abi_decoded_balances]

with open('../data/0x1594180567438ad258c98e263b412f55f62a23ebb5e9056089e9f11a513d4816_transaction.json', mode='a') as fp:
    json.dump(transaction_dict, fp, default=str)
    fp.close()

with open('../data/0x1594180567438ad258c98e263b412f55f62a23ebb5e9056089e9f11a513d4816_events.json', mode='a') as fp:
    json.dump(abi_decoded_events_dict, fp, default=str)
    fp.close()

with open('../data/0x1594180567438ad258c98e263b412f55f62a23ebb5e9056089e9f11a513d4816_calls.json', mode='a') as fp:
    json.dump(abi_decoded_calls_dict, fp, default=str)
    fp.close()

with open('../data/0x1594180567438ad258c98e263b412f55f62a23ebb5e9056089e9f11a513d4816_transfers.json', mode='a') as fp:
    json.dump(abi_decoded_transfers_dict, fp, default=str)
    fp.close()

with open('../data/0x1594180567438ad258c98e263b412f55f62a23ebb5e9056089e9f11a513d4816_balances.json', mode='a') as fp:
    json.dump(abi_decoded_balances_dict, fp, default=str)
    fp.close()
 """