In [1]:
import requests
from typing import Union
import json


def authenticate(url: str, user: str, password: str):
    form_data = {
        "grant_type": "password",
        "username": user,
        "password": password
    }

    access_token = requests.post(
        f"{url}/auth/token", data=form_data).json()["access_token"]

    request_header = {
        "accept": "application/json",
        "Authorization": f"Bearer {access_token}"
    }

    return access_token, request_header


def get_block(url: str, request_header: dict,  block_identifier: Union[int, str], full_transactions=False):
    params = {
        "block_identifier": block_identifier,
        "full_transactions": full_transactions,
    }

    response = requests.get(
        f"{url}/mainnet/history/block", params=params, headers=request_header)

    return response.json()


def get_transaction(url: str, request_header: dict, transaction_hash: str, decode_input: bool = False):
    params = {
        "transaction_hash": transaction_hash,
        "decode_input": decode_input,
    }

    response = requests.get(
        f"{url}/mainnet/history/transaction", params=params, headers=request_header)

    return response.json()


def get_events(url: str, request_header: dict, contract_address: str, from_block: Union[int, str] = 0, to_block: Union[int, str] = "latest", decode_events: bool = False):
    params = {
        "contract_address": contract_address,
        "from_block": from_block,
        "to_block": to_block,
        "decode_events": decode_events,
    }

    response = requests.get(
        f"{url}/mainnet/contract/events", params=params, headers=request_header)

    return response.json()



In [2]:
from dotenv import dotenv_values
config = dotenv_values("../.env")


In [3]:
url = config['DESS_NODE_URL']
user = config['DESS_NODE_USR']
password = config['DESS_NODE_PWD']
# if __name__ == "__main__":

# get access token and request header
access_token, request_header = authenticate(url, user, password)


In [6]:
# request block with number
block_identifier = "latest"
full_transactions = False

block = get_block(url, request_header, block_identifier, full_transactions)

In [7]:
block

{'baseFeePerGas': 11546816686,
 'difficulty': 0,
 'extraData': '0xd883010b05846765746888676f312e32302e32856c696e7578',
 'gasLimit': 30000000,
 'gasUsed': 8098032,
 'hash': '0xb47d27589f701d273f03098b78742f6ac8f3db634e0f636df9b368e7cb27bff5',
 'logsBloom': '0xa831c1a36bd8626d9c0402e08250028890a3340d3fa0b104400989229580a80502281344c040420182713b18069125100b178c129a2228814306c1691928e0ce05024088456089fe38a262eb902080fc8aed800009674a0025560426807820c08b900cce9f1497814141f5c042ae094d80160423406a040b42422818024a40000698025002854940a4484e0d0b0200083ccc8489b500409c40a8806580f4089c267225ca113168096c3b09f4d056850037800cee428791208a68f00651ec444a11582892800004c54030861604805e008c62012e9042a0140a20504a30a8e00c39f3fb8c428819c008051080a2808248888092167fd20a704099b1096c11b598',
 'miner': '0x388C818CA8B9251b393131C08a736A67ccB19297',
 'mixHash': '0x06bc753dfcdec4b9f38e76bfe2f26f0c9b7b0b84e854278a7872aa504add5a8a',
 'nonce': '0x0000000000000000',
 'number': 18212917,
 'parentHash': '0x012c0366ea74c9f15

In [8]:
with open("../data/block_dess.json", "w") as f:
    json.dump(block, f, indent=4)


In [9]:
# request single transaction with hash
transaction_hash = "0xd6d0b55a0d12f6d0a23da1d37e99969aca04af0311bd5d69095be7c9fb9b9756"
decode_input = False

transaction = get_transaction(
    url, request_header, transaction_hash, decode_input)

with open("../data/transaction_dess.json", "w") as f:
    json.dump(transaction, f, indent=4)


In [None]:
# request events of contract
contract_address = "0x889edC2eDab5f40e902b864aD4d7AdE8E412F9B1"
from_block = 18122866
to_block = "latest"
decode_events = False

events = get_events(url, request_header, contract_address,
                    from_block, to_block, decode_events)

with open("../data/events_dess.json", "w") as f:
    json.dump(events, f, indent=4)