# INIT

In [None]:
import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent.parent.absolute()))
from utils.logger import get_logger
import config

logger = get_logger(__name__)

In [None]:
from tools.chain_simulator_connector import start_handler
from argparse import Namespace
from time import sleep

def chain_sim_init():
    docker_path = config.HOME / "Projects/testing/full-stack-docker-compose/chain-simulator"
    state_path = config.DEFAULT_WORKSPACE / "states-safeprice"
    args = Namespace(docker_path=str(docker_path), state_path=str(state_path))

    chain_sim, found_accounts = start_handler(args)
    print(f'Loaded {len(found_accounts)} accounts')
    sleep(10)

    return chain_sim, found_accounts

# Chain sim start

In [None]:
chain_sim, found_accounts = chain_sim_init()

# Context init

In [None]:
from context import Context
from tools.chain_simulator_connector import ChainSimulator

import importlib
import os

os.environ["MX_DEX_ENV"] = "chainsim"
chain_sim = ChainSimulator()
importlib.reload(config)
import config

context = Context()

Paths

In [None]:
abi_path = config.HOME / "Projects/dex/mx-exchange-sc/dex/pair/output/safe-price-view.abi.json"
router_bytecode_path = config.HOME / "Projects/dex/mx-exchange-sc/dex/router/output/router.wasm"
pair_view_bytecode_path = config.HOME / "Projects/dex/mx-exchange-sc/dex/pair/output/safe-price-view.wasm"
template_pair_bytecode_path = config.HOME / "Projects/dex/mx-exchange-sc/dex/pair/output/pair.wasm"

# Safe price

In [None]:
from contracts.pair_contract import PairContract
from multiversx_sdk.abi import Abi, AddressValue, U64Value
from multiversx_sdk import SmartContractController, Address

pair_contract: PairContract = context.get_contracts(config.PAIRS_V2)[0]
safe_price_view_contract = context.get_contracts(config.PAIRS_VIEW)[0]

abi = Abi.load(abi_path)
view_controller = SmartContractController(context.network_provider.proxy.get_network_config().chain_id, context.network_provider.proxy, abi)

In [None]:
query = view_controller.create_query(Address.new_from_bech32(safe_price_view_contract.address), "getSafePriceByRoundOffset", 
                                                 [pair_contract.address, 100, [pair_contract.firstToken, 0, 1*10**18]])
response = view_controller.run_query(query)
if response.return_code != "ok":
    raise Exception(f"Error: {response.return_code}")
parsed_response = view_controller.parse_query_response(response)
print(parsed_response)

In [None]:
from multiversx_sdk.abi import BigUIntValue, U64Value, Serializer, StructValue, Field

def get_round_from_observation(raw_entry: bytes):
    first_token_reserve_accumulated = BigUIntValue()
    second_token_reserve_accumulated = BigUIntValue()
    weight_accumulated = U64Value()
    recording_round = U64Value()
    recording_timestamp = U64Value()
    lp_supply_accumulated = BigUIntValue()

    attributes = StructValue([
        Field("first_token_reserve_accumulated", first_token_reserve_accumulated),
        Field("second_token_reserve_accumulated", second_token_reserve_accumulated),
        Field("weight_accumulated", weight_accumulated),
        Field("recording_round", recording_round),
        # Field("recording_timestamp", recording_timestamp),
        # Field("lp_supply_accumulated", lp_supply_accumulated),
    ])
    serializer = Serializer()
    serialized_data = raw_entry.hex()
    serializer.deserialize(serialized_data, [attributes])
    # print(first_token_reserve_accumulated.get_payload())
    # print(second_token_reserve_accumulated.get_payload())
    # print(weight_accumulated.get_payload())
    # print(recording_round.get_payload())
    # print(recording_timestamp.get_payload())
    # print(lp_supply_accumulated.get_payload())

    return recording_round.get_payload()

## monitor state observations

In [None]:
from multiversx_sdk import ProxyNetworkProvider

def get_pair_state(pair_contract: PairContract):
    if config.CURRENT_ENV.value == "mainnet":
        proxy = ProxyNetworkProvider(config.HISTORY_PROXY)
    else:
        proxy = context.network_provider.proxy
    state = proxy.get_account_storage(Address.new_from_bech32(pair_contract.address))
    return state

def get_last_observation_round(pair_contract: PairContract):
    latest_round = 0
    state = get_pair_state(pair_contract)
    for entry in state.entries:
        if "price_observations.item" in entry.key:
            round = get_round_from_observation(entry.value)
            if round > latest_round:
                latest_round = round

    return latest_round

# Upgrade

Router

In [None]:
from contracts.router_contract import RouterContract
router_contract: RouterContract
router_contract = context.get_contracts(config.ROUTER_V2)[0]

def router_upgrade():
    router_contract.contract_upgrade(context.deployer_account, context.network_provider.proxy, router_bytecode_path)
    chain_sim.advance_blocks(1)
    router_contract.resume(context.deployer_account, context.network_provider.proxy)
    chain_sim.advance_blocks(1)

router_upgrade()

New observation interval setup & default safe price rounds offset (for legacy views)

In [None]:
from contracts.router_contract import RouterContract
def set_safe_price_round_save_interval(new_observation_interval: int):
    router_contract: RouterContract
    router_contract = context.get_contracts(config.ROUTER_V2)[0]
    router_contract.set_safe_price_round_save_interval(context.deployer_account, context.network_provider.proxy, 
                                                    new_observation_interval)
    chain_sim.advance_blocks(1)

set_safe_price_round_save_interval(10)

In [None]:
from contracts.router_contract import RouterContract
def set_default_safe_price_rounds_offset(new_default_safe_price_rounds_offset: int):
    router_contract: RouterContract
    router_contract = context.get_contracts(config.ROUTER_V2)[0]
    router_contract.set_default_safe_price_rounds_offset(context.deployer_account, context.network_provider.proxy,
                                                     new_default_safe_price_rounds_offset)
    chain_sim.advance_blocks(1)

set_default_safe_price_rounds_offset(600)

Safeprice view

In [None]:
pair_view_contract: PairContract = context.get_contracts(config.PAIRS_VIEW)[0]

def pair_view_upgrade():
    pair_view_contract.contract_upgrade(context.deployer_account, context.network_provider.proxy, pair_view_bytecode_path, [], True)
    chain_sim.advance_blocks(1)

pair_view_upgrade()

Template

In [None]:
from contracts.router_contract import RouterContract
from contracts.pair_contract import PairContract

router_contract: RouterContract
router_contract = context.get_contracts(config.ROUTER_V2)[0]
template_pair_address = router_contract.get_pair_template_address(context.network_provider.proxy)
print(template_pair_address)
template_pair = PairContract.load_contract_by_address(template_pair_address)

def template_pair_upgrade():
    # upgrade template pair contract
    template_pair.contract_upgrade(context.deployer_account, context.network_provider.proxy, template_pair_bytecode_path, [], no_init=True)
    chain_sim.advance_blocks(1)

template_pair_upgrade()

Pair

In [None]:
from contracts.router_contract import RouterContract
router_contract: RouterContract
router_contract = context.get_contracts(config.ROUTER_V2)[0]

def pair_upgrade():
    router_contract.pair_contract_upgrade(context.deployer_account, context.network_provider.proxy, [pair_contract.firstToken, pair_contract.secondToken])
    chain_sim.advance_blocks(1)

pair_upgrade()

# SWAPS functions

In [None]:
print(f"firstToken: {pair_contract.firstToken}")
print(f"secondToken: {pair_contract.secondToken}")

swap logger

In [None]:
from contracts.pair_contract import SwapFixedOutputEvent, SwapFixedInputEvent
from typing import Any
import csv

class SwapLogger:
    def __init__(self, filename: str):
        self.data = []
        self.filename = Path().cwd().parent.parent / filename
        # create the csvfile if it doesn't exist and write the header
        if not os.path.exists(self.filename):
            with open(self.filename, "w") as f:
                writer = csv.writer(f)
                header = ["round", "event_type", "token_in", "amount_in", "token_out", "amount_out"]
                writer.writerow(header)
        else:
            self.data = self.__load(self.filename)

    def log_event(self, event: SwapFixedInputEvent | SwapFixedOutputEvent):
        # get the current round and the swap event and log them into the file
        current_round = context.network_provider.get_round(1)
        if self.data and current_round < self.data[-1]["round"]:
            logger.error(f"Current round {current_round} is less than the last logged round {self.data[-1]['round']}")
            return

        event_type = type(event).__name__
        if isinstance(event, SwapFixedInputEvent):
            token_in, amount_in, token_out, amount_out = event.tokenA, event.amountA, event.tokenB, event.amountBmin
        else:
            token_in, amount_in, token_out, amount_out = event.tokenA, event.amountAmax, event.tokenB, event.tokenB

        with open(self.filename, "a", newline='') as f:
            writer = csv.writer(f)
            row = [current_round, event_type, token_in, amount_in, token_out, amount_out]
            writer.writerow(row)
            
        self.data.append({
            "round": current_round,
            "event_type": event_type,
            "token_in": token_in,
            "amount_in": amount_in,
            "token_out": token_out,
            "amount_out": amount_out
        })

    @classmethod
    def __load(cls, filename: str) -> list[dict[str, Any]]:
        data = []
        with open(filename, "r") as f:
            reader = csv.DictReader(f)
            for row in reader:
                row["round"] = int(row["round"])
                data.append(row)
        return data
        
    def get_next_event(self) -> tuple[int, SwapFixedInputEvent | SwapFixedOutputEvent | None]:
        if len(self.data) == 0:
            return 0, None

        element = self.data[0]
        self.data.pop(0)
        print(element)
        if element["event_type"] == "SwapFixedInputEvent":
            return element["round"], SwapFixedInputEvent(element["token_in"], int(element["amount_in"]), element["token_out"], int(element["amount_out"]))
        elif element["event_type"] == "SwapFixedOutputEvent":
            return element["round"], SwapFixedOutputEvent(element["token_in"], int(element["amount_in"]), element["token_out"], int(element["amount_out"]))
        else:
            raise Exception(f"Unknown event type: {element['event_type']}")


snipe the observation round

In [None]:
from time import sleep
from utils.contract_retrievers import PairContractDataFetcher
from contracts.pair_contract import SwapFixedInputEvent
from multiversx_sdk.abi import TokenIdentifierValue, BigUIntValue

def snipe_observation_round():
    swap_logger = SwapLogger("dump/swap_events_snipe_1.csv")
    amount_to_swap = 1000 * 10**18
    while True:
        last_observation_round = get_last_observation_round(pair_contract)
        print(f"current round: {context.network_provider.get_round()}")
        print(f"latest observation round: {last_observation_round}")

        observation_interval = pair_contract.get_safe_price_round_save_interval(context.network_provider.proxy)
        current_round = context.network_provider.get_round()
        next_observation_round = current_round + observation_interval - (current_round - last_observation_round) % observation_interval

        # wait until next observation round
        print(f"targetting observation round {next_observation_round}")
        while context.network_provider.get_round() < next_observation_round - 2:
            sleep(0.2)
            # chain_sim.advance_blocks(1)

        pair_data_fetcher = PairContractDataFetcher(Address.new_from_bech32(pair_contract.address), context.network_provider.proxy.url)
        amount_out = pair_data_fetcher.get_data("getAmountOut",[TokenIdentifierValue(pair_contract.firstToken), BigUIntValue(amount_to_swap)])

        event_forwards = SwapFixedInputEvent(pair_contract.firstToken, amount_to_swap,
                                    pair_contract.secondToken, 1)
        pair_contract.swap_fixed_input(context.network_provider, context.deployer_account, event_forwards)
        swap_logger.log_event(event_forwards)

        print(f"swaps sent in round: {context.network_provider.get_round()}")
        # TODO: to remove if using chain sim manual blocks advance
        sleep(1)

        event_consolidation = SwapFixedInputEvent(pair_contract.firstToken, 1001,
                                    pair_contract.secondToken, 1)
        pair_contract.swap_fixed_input(context.network_provider, context.deployer_account, event_consolidation)
        swap_logger.log_event(event_consolidation)

        sleep(1)

        event_backwards = SwapFixedInputEvent(pair_contract.secondToken, amount_out,
                                    pair_contract.firstToken, 1)
        pair_contract.swap_fixed_input(context.network_provider, context.deployer_account, event_backwards)
        swap_logger.log_event(event_backwards)
        
        # chain_sim.advance_blocks(1)

Random swaps

In [None]:
from utils.utils_chain import get_all_token_nonces_details_for_account
from utils.contract_retrievers import PairContractDataFetcher
from contracts.pair_contract import SwapFixedInputEvent
from time import sleep
import random

def random_swaps(logging_path: str):
    swap_logger = SwapLogger(logging_path)
    spend_range_percent = [10, 30]
    swaps_delay_ms = [200, 2000]
    while True:
        # select a random token to spend
        spend_token = random.choice([pair_contract.firstToken, pair_contract.secondToken])
        other_token = pair_contract.firstToken if spend_token == pair_contract.secondToken else pair_contract.secondToken
        
        # get the balance of the spent token
        response = get_all_token_nonces_details_for_account(spend_token, context.deployer_account.address.bech32(), context.network_provider.proxy)[0]
        balance = int(response.get("balance", 0))
        if balance == 0:
            logger.warning(f"No balance for {spend_token}")
        spend_amount = random.randint(spend_range_percent[0], spend_range_percent[1]) * balance // 100

        # determine delay between swaps
        delay = random.randint(swaps_delay_ms[0], swaps_delay_ms[1])
        logger.debug(f"Swapping {spend_amount} {spend_token} for {other_token}. Delay: {delay}ms")
        
        sleep(delay / 1000)

        # prepare swap
        event = SwapFixedInputEvent(spend_token, spend_amount, other_token, 1)
        pair_contract.swap_fixed_input(context.network_provider, context.deployer_account, event)
        swap_logger.log_event(event)

Replayed swaps

In [None]:
def replay_swaps(filename: str):
    swap_logger = SwapLogger(filename)
    current_round = context.network_provider.get_round(1)
    while True:
        round, event = swap_logger.get_next_event()
        if event is None:
            logger.warning("No more swaps to replay")
            break

        if round < current_round:
            logger.warning(f"Skipping swap at round {round} because current round is {current_round}")
            continue
        elif round > current_round:
            logger.debug(f"Waiting for round {round} to replay swap")
            while context.network_provider.get_round(1) < round:
                sleep(0.2)
            current_round = round

        if isinstance(event, SwapFixedInputEvent):
            logger.debug(f"Swapping {event.amountA} {event.tokenA} for {event.amountBmin} {event.tokenB}")
            pair_contract.swap_fixed_input(context.network_provider, context.deployer_account, event)
        else:
            logger.debug(f"Swapping {event.amountAmax} {event.tokenA} for {event.amountB} {event.tokenB}")
            pair_contract.swap_fixed_output(context.network_provider, context.deployer_account, event)

Random liquidity provisioning

In [None]:
from utils.utils_chain import get_all_token_nonces_details_for_account
from utils.contract_retrievers import PairContractDataFetcher
from contracts.pair_contract import SwapFixedInputEvent
from time import sleep
import random

def random_swaps(logging_path: str):
    swap_logger = SwapLogger(logging_path)
    spend_range_percent = [10, 30]
    swaps_delay_ms = [200, 2000]
    while True:
        # select a random token to spend
        spend_token = random.choice([pair_contract.firstToken, pair_contract.secondToken])
        other_token = pair_contract.firstToken if spend_token == pair_contract.secondToken else pair_contract.secondToken
        
        # get the balance of the spent token
        response = get_all_token_nonces_details_for_account(spend_token, context.deployer_account.address.bech32(), context.network_provider.proxy)[0]
        balance = int(response.get("balance", 0))
        if balance == 0:
            logger.warning(f"No balance for {spend_token}")
        spend_amount = random.randint(spend_range_percent[0], spend_range_percent[1]) * balance // 100

        # determine delay between swaps
        delay = random.randint(swaps_delay_ms[0], swaps_delay_ms[1])
        logger.debug(f"Swapping {spend_amount} {spend_token} for {other_token}. Delay: {delay}ms")
        
        sleep(delay / 1000)

        # prepare swap
        event = SwapFixedInputEvent(spend_token, spend_amount, other_token, 1)
        pair_contract.swap_fixed_input(context.network_provider, context.deployer_account, event)
        swap_logger.log_event(event)

# SWAPS Executables

In [None]:
from contracts.pair_contract import SwapFixedInputEvent

event = SwapFixedInputEvent(pair_contract.firstToken, 1000,
                            pair_contract.secondToken, 1)
pair_contract.swap_fixed_input(context.network_provider, context.deployer_account, event)
chain_sim.advance_blocks(1)

Snipe observations

In [None]:
snipe_observation_round()

Random swaps

In [None]:
random_swaps("dump/swap_events_random_4.csv")

Replayed swaps

In [None]:
replay_swaps("dump/swap_events_random_2.csv")

Manual checks

In [None]:
from contracts.router_contract import RouterContract
from contracts.pair_contract import PairContract

router_contract: RouterContract
router_contract = context.get_contracts(config.ROUTER_V2)[0]
template_pair_address = router_contract.get_pair_template_address(context.network_provider.proxy)
print(template_pair_address)
template_pair = PairContract.load_contract_by_address(template_pair_address)
print(template_pair)

In [None]:
state = get_pair_state(pair_contract)
for entry in state.entries:
    if "price" in entry.key:
        print(f"{entry.key}: {entry.raw}")

In [None]:
from enum import Enum
from typing import Any
class Timebase(Enum):
    ROUND = "Round"
    TIMESTAMP = "Timestamp"

def get_lp_safe_price_by_offset(timebase: Timebase, context: Context, abi: Abi, pair_contract: PairContract, 
                                offset: int, reference_amount: int) -> list[Any]:
    """ Returns a list of namespaces containing the two tokens underlying the given lp reference amount.
        Each namespace contains:
        - token_identifier
        - nonce
        - amount"""
    safe_price_view_contract = context.get_contracts(config.PAIRS_VIEW)[0]
    view_controller = SmartContractController(context.network_provider.proxy.get_network_config().chain_id, context.network_provider.proxy, abi)
    
    endpoint = f"getLpTokensSafePriceBy{timebase.value}Offset"
    query = view_controller.create_query(Address.new_from_bech32(safe_price_view_contract.address), endpoint, 
                                                 [pair_contract.address, offset, reference_amount])
    response = view_controller.run_query(query)
    if response:
        return view_controller.parse_query_response(response)
    return []

def get_safe_price_by_offset(timebase: Timebase, context: Context, abi: Abi, pair_contract: PairContract, 
                             offset: int, token: str, reference_amount: int) -> tuple[int, str]:
    """ Returns the amount and token identifier of the safe price of the given token at the given offset """
    safe_price_view_contract = context.get_contracts(config.PAIRS_VIEW)[0]
    view_controller = SmartContractController(context.network_provider.proxy.get_network_config().chain_id, context.network_provider.proxy, abi)
    
    endpoint = f"getSafePriceBy{timebase.value}Offset"
    query = view_controller.create_query(Address.new_from_bech32(safe_price_view_contract.address), endpoint, 
                                                 [pair_contract.address, offset, [token, 0, reference_amount]])
    response = view_controller.run_query(query)
    if response:
        return view_controller.parse_query_response(response)[0].amount, view_controller.parse_query_response(response)[0].token_identifier
    return 0, ""


In [None]:
tokens = get_lp_safe_price_by_offset(Timebase.ROUND, context, abi, pair_contract, 100, 1 * 10**18)
print([(t.token_identifier, t.amount) for t in tokens])

In [None]:
safe_price_view_contract = context.get_contracts(config.PAIRS_VIEW)[0]
view_controller = SmartContractController(context.network_provider.proxy.get_network_config().chain_id, context.network_provider.proxy, abi)

endpoint = f"getLpTokensSafePriceByDefaultOffset"
query = view_controller.create_query(Address.new_from_bech32(safe_price_view_contract.address), endpoint, 
                                                [pair_contract.address, 1 * 10**18])
response = view_controller.run_query(query)
if response:
    print(response.return_message)
    print(view_controller.parse_query_response(response))

# SET STATE for user

In [None]:
user = "erd1pye4dsy3fs956skp8wgf5pjtfynvw5y52gwdmsrrum9jcv3y2vusk7we6t"
chain_sim.apply_states([[{
        "address": user,
        "nonce": 0,
        "balance": "100000000000000000000",
        "username": "",
        "code": "",
        "developerReward": "0",
        "ownerAddress": "",
        "pairs": {}
    }]])

In [None]:
user = "erd1ss6u80ruas2phpmr82r42xnkd6rxy40g9jl69frppl4qez9w2jpsqj8x97"
esdt = "WEGLD-bd4d79"
amount = 10000 * 10**18

from utils.utils_chain import dec_to_padded_hex
current_entry = context.network_provider.proxy.get_account_storage_entry(Address.new_from_bech32(user), f"ELRONDesdt{esdt}")
if not current_entry:
    raise Exception("No entry found")

print(current_entry.value.hex())
print(dec_to_padded_hex(amount))
header = current_entry.value.hex()[:2]
new_entry = f"{header}{dec_to_padded_hex(len(dec_to_padded_hex(amount)) // 2 + 1)}{'00'}{dec_to_padded_hex(amount)}"
print(new_entry)

chain_sim.apply_states([[{
        "address": user,
        "pairs": {
            current_entry.key.encode().hex(): new_entry
        }
    }]])

# 120600174876e800
# 12070038f9f643173a
# 1208003877543179f11e
# 120900d02ab486cedc0000
# 120900059309ed4f0a1e0d


# FEES setup for pairs

In [None]:
from contracts.pair_contract import PairContract
from utils.contract_retrievers import retrieve_pair_by_address

pair_contract: PairContract
pair_contract = retrieve_pair_by_address("erd1qqqqqqqqqqqqqpgqph6g8569lnvpgd3x569hd6n6qse2aw0w0n4sms6nzv")    # operating pair
mex_contract = retrieve_pair_by_address("erd1qqqqqqqqqqqqqpgqa0fsfshnff4n76jhcye6k7uvd7qacsq42jpsp6shh2") # egldmex contract

In [None]:
from contracts.pair_contract import PairContract
from utils.contract_retrievers import retrieve_pair_by_address

pair_contract: PairContract
pair_contract = retrieve_pair_by_address("erd1qqqqqqqqqqqqqpgq0e9pmlzr0nk5nkulzcmessttsjkzr4xf0n4sue4r8e")    # operating pair

In [None]:
# whitelist in egldmex pair for swap no fees
mex_contract.whitelist_contract(context.deployer_account, context.network_provider.proxy, pair_contract.address)

In [None]:
# set where to swap and what to do with the fees
pair_contract.add_trusted_swap_pair(context.deployer_account, context.network_provider.proxy,
                                    [
                                        mex_contract.address,
                                        mex_contract.firstToken,
                                        mex_contract.secondToken
                                    ])

In [None]:
pair_contract.set_fees_percents(context.deployer_account, context.network_provider.proxy,
                                [300, 100])

In [None]:
from contracts.router_contract import RouterContract
router_contract: RouterContract
router_contract = context.get_contracts(config.ROUTER_V2)[0]

pair_contract.set_fee_on_via_router(context.deployer_account, context.network_provider.proxy, router_contract, 
                                [
                                    config.ZERO_CONTRACT_ADDRESS,
                                    mex_contract.secondToken
                                ])

In [None]:
from contracts.pair_contract import AddLiquidityEvent

event = AddLiquidityEvent(pair_contract.firstToken, 127791780000000000000, 1, pair_contract.secondToken, 5000000000000000000, 1)
pair_contract.add_liquidity(context.network_provider, context.deployer_account, event)

In [None]:
pair_contract.resume(context.deployer_account, context.network_provider.proxy)

# Regression

## Functions

In [None]:
from utils.utils_scenarios import PhaseDictsCollector
from utils.utils_chain import Account, WrapperAddress
from typing import Any, List, Tuple
from utils.contract_data_fetchers import PairContractDataFetcher
import webbrowser


def users_init() -> list[Account]:
    print(context.deployer_account.address.bech32())
    context.deployer_account.sync_nonce(context.network_provider.proxy)
    chain_sim.fund_users_w_esdt_from_mainnet(found_accounts, pair_contract.lpToken, 1000 * 10 ** 18)

    users = []
    for user in found_accounts:
        if user == context.deployer_account.address.bech32():   # skip deployer account
            continue
        user_account = Account(pem_file=config.DEFAULT_ACCOUNTS)
        user_account.address = WrapperAddress(user)
        if user_account.address.get_shard() != 1:   # select only shard 1 accounts due to limitation in system account token attributes retrieval
            continue
        user_account.sync_nonce(context.network_provider.proxy)
        users.append(user_account)

    return users

def open_tx_in_explorer(tx_hash: str, open_in_browser: bool = True):
    if not open_in_browser:
        return

    if config.CURRENT_ENV.value == "chainsim":
        webbrowser.open(f"https://custom-network.internal-explorer.multiversx.com/transactions/{tx_hash}")
    elif config.CURRENT_ENV.value == "shadowfork4":
        webbrowser.open(f"https://testnet-tc-shadowfork-four.internal-explorer.multiversx.com/transactions/{tx_hash}")

def collect_general_test_data(step: int, collector: PhaseDictsCollector, pair: PairContract):
    pair_data_fetcher = PairContractDataFetcher(Address.new_from_bech32(pair.address), context.network_provider.proxy.url)
    pair_data = {
        "first_token_reserve": pair_data_fetcher.get_token_reserve(pair.firstToken),
        "second_token_reserve": pair_data_fetcher.get_token_reserve(pair.secondToken),
        "state_reserves": pair_data_fetcher.get_data("getReservesAndTotalSupply")
    }
    collector.add(f"token_reserve_{step}", pair_data, "Token reserves")

def init_pre_update_test(initial_blocks: int) -> Tuple[Any, PhaseDictsCollector, list[Account]]:
    global found_accounts
    chain_sim, found_accounts = chain_sim_init()
            
    users = users_init()

    collector = PhaseDictsCollector()
    collector.set_phase("before")

    chain_sim.advance_blocks(3)
    consumed_blocks = 3

    chain_sim.advance_blocks(initial_blocks - consumed_blocks)

    sleep(2)
    collect_general_test_data(0, collector, pair_contract)

    return chain_sim, collector, users

def init_post_upgrade_test(initial_blocks: int, chain_sim: ChainSimulator, collector: PhaseDictsCollector):
    # input("Restart Chain Simulator then press Enter to continue...")
    
    chain_sim.stop()
    chain_sim_init()
    
    users = users_init()

    collector.set_phase("after")
    
    chain_sim.advance_blocks(1)
    consumed_blocks = 1

    router_upgrade()
    consumed_blocks += 2
    set_safe_price_round_save_interval(10)
    consumed_blocks += 1
    set_default_safe_price_rounds_offset(6000)
    consumed_blocks += 1
    pair_view_upgrade()
    consumed_blocks += 1
    template_pair_upgrade()
    consumed_blocks += 1
    pair_upgrade()
    consumed_blocks += 1

    chain_sim.advance_blocks(2)
    consumed_blocks += 2

    if initial_blocks < consumed_blocks:
        raise Exception(f"Initial blocks {initial_blocks} is less than consumed blocks {consumed_blocks}. Skewed results will occur.")
    
    block_diff = initial_blocks - consumed_blocks
    chain_sim.advance_blocks(block_diff)

    sleep(2)
    collect_general_test_data(0, collector, pair_contract)

    return chain_sim, users

def report_test_data(collector: PhaseDictsCollector, assert_no_differences: bool = True):
    collector.print_collections()
    
    differences = collector.compare_all()
    if differences:
        print("Found differences:")
        for diff in differences:
            print(f"- {diff}")
    else:
        print("All comparisons passed!")
        
    if assert_no_differences:
        assert not differences, "\n".join(differences)

In [None]:
from utils.utils_scenarios import get_token_in_account
from contracts.pair_contract import AddLiquidityEvent, RemoveLiquidityEvent, SwapFixedInputEvent, SwapFixedOutputEvent
from multiversx_sdk.abi import TokenIdentifierValue, BigUIntValue

def add_liquidity_for_user(user_account: Account, percentage: int = 100, slippage_percentage: int = 1):
    user_account.sync_nonce(context.network_provider.proxy)

    _, first_token_amount, _ = get_token_in_account(context.network_provider.proxy, user_account, pair_contract.firstToken)
    _, second_token_amount, _ = get_token_in_account(context.network_provider.proxy, user_account, pair_contract.secondToken)

    if first_token_amount == 0 or second_token_amount == 0:
        raise Exception("Not enough tokens in account")

    first_token_amount = first_token_amount * percentage // 100
    second_token_amount = second_token_amount * percentage // 100

    logger.debug(f"Adding liquidity for user {user_account.address.bech32()} with {first_token_amount} {pair_contract.firstToken} and {second_token_amount} {pair_contract.secondToken}")
    event = AddLiquidityEvent(pair_contract.firstToken, first_token_amount, first_token_amount * slippage_percentage // 100, 
                              pair_contract.secondToken, second_token_amount, second_token_amount * slippage_percentage // 100)
    tx_hash = pair_contract.add_liquidity(context.network_provider, user_account, event)

    return tx_hash

def remove_liquidity_for_user(user_account: Account, percentage: int = 100, slippage_percentage: int = 1):
    user_account.sync_nonce(context.network_provider.proxy)

    _, lp_amount, _ = get_token_in_account(context.network_provider.proxy, user_account, pair_contract.lpToken)

    if lp_amount == 0:
        raise Exception("Not enough lp tokens in account")

    lp_amount = lp_amount * percentage // 100

    pair_data_fetcher = PairContractDataFetcher(Address.new_from_bech32(pair_contract.address), context.network_provider.proxy.url)
    estimates = pair_data_fetcher.get_data("getTokensForGivenPosition", [BigUIntValue(lp_amount)])
    first_token_amount = abi.decode_custom_type("EsdtTokenPayment", bytes.fromhex(estimates[0])).amount
    second_token_amount = abi.decode_custom_type("EsdtTokenPayment", bytes.fromhex(estimates[1])).amount

    logger.debug(f"Removing liquidity for user {user_account.address.bech32()} with {lp_amount} {pair_contract.lpToken}")
    event = RemoveLiquidityEvent(lp_amount, pair_contract.firstToken, first_token_amount * slippage_percentage // 100, 
                                pair_contract.secondToken, second_token_amount * slippage_percentage // 100)
    tx_hash = pair_contract.remove_liquidity(context.network_provider, user_account, event)

    return tx_hash

def swap_fixed_input_for_user(user_account: Account, token_in: str, percentage: int = 100, slippage_percentage: int = 1):
    user_account.sync_nonce(context.network_provider.proxy)

    _, token_in_amount, _ = get_token_in_account(context.network_provider.proxy, user_account, token_in)
    other_token = pair_contract.firstToken if token_in == pair_contract.secondToken else pair_contract.secondToken

    if token_in_amount == 0:
        raise Exception("Not enough tokens in account")

    token_in_amount = token_in_amount * percentage // 100  # swap only half of the tokens

    pair_data_fetcher = PairContractDataFetcher(Address.new_from_bech32(pair_contract.address), context.network_provider.proxy.url)
    token_out_amount = pair_data_fetcher.get_data("getAmountOut", [TokenIdentifierValue(token_in), BigUIntValue(token_in_amount)])
    slippage = token_out_amount * slippage_percentage // 100
    if slippage == 0:
        slippage = 1

    logger.debug(f"Swapping {token_in_amount} {token_in} for {other_token}")
    event = SwapFixedInputEvent(token_in, token_in_amount, other_token, slippage)
    tx_hash = pair_contract.swap_fixed_input(context.network_provider, user_account, event)

    return tx_hash

def swap_fixed_output_for_user(user_account: Account, token_in: str, percentage: int = 100, slippage_percentage: int = 100):
    user_account.sync_nonce(context.network_provider.proxy)

    _, token_in_amount, _ = get_token_in_account(context.network_provider.proxy, user_account, token_in)
    other_token = pair_contract.firstToken if token_in == pair_contract.secondToken else pair_contract.secondToken

    if token_in_amount == 0:
        raise Exception("Not enough tokens in account")
    
    token_in_amount = token_in_amount * percentage // 100  # swap only half of the tokens

    pair_data_fetcher = PairContractDataFetcher(Address.new_from_bech32(pair_contract.address), context.network_provider.proxy.url)
    token_out_amount = pair_data_fetcher.get_data("getAmountOut", [TokenIdentifierValue(token_in), BigUIntValue(token_in_amount)])
    slippage = token_out_amount * slippage_percentage // 100
    if slippage == 0:
        slippage = 1

    logger.debug(f"Swapping {token_in_amount} {token_in} for {other_token}")
    event = SwapFixedOutputEvent(token_in, token_in_amount, other_token, slippage)
    tx_hash = pair_contract.swap_fixed_output(context.network_provider, user_account, event)

    return tx_hash

## Executables

In [None]:
users = users_init()
print([user.address.bech32() for user in users])

In [None]:
print([user.address.bech32() for user in users])

In [None]:
user = users[0]
remove_liquidity_for_user(user)
chain_sim.advance_blocks(1)

positives

In [None]:
show_in_browser = False

user_index = 0
initial_blocks = 15

def run_scenario():
    step = 1

    logger.info(f"Exit liquidity at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = remove_liquidity_for_user(user)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Exit liquidity ops")
    logs = context.network_provider.proxy.get_transaction(tx_hash).raw['logs']
    collector.add(f"logs_{step}", logs, "Exit liquidity logs")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Swap fixed input first token in at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = swap_fixed_input_for_user(user, pair_contract.firstToken, 50)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Swap fixed input first token in")
    logs = context.network_provider.proxy.get_transaction(tx_hash).raw['logs']
    collector.add(f"logs_{step}", logs, "Swap fixed input first token in logs")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Swap fixed input second token in at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = swap_fixed_input_for_user(user, pair_contract.secondToken, 50)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Swap fixed input second token in")
    logs = context.network_provider.proxy.get_transaction(tx_hash).raw['logs']
    collector.add(f"logs_{step}", logs, "Swap fixed input second token in logs")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Swap fixed output first token in at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = swap_fixed_output_for_user(user, pair_contract.firstToken, 50)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Swap fixed output first token in")
    logs = context.network_provider.proxy.get_transaction(tx_hash).raw['logs']
    collector.add(f"logs_{step}", logs, "Swap fixed output first token in logs")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Swap fixed output second token in at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = swap_fixed_output_for_user(user, pair_contract.secondToken, 50)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Swap fixed output second token in")
    logs = context.network_provider.proxy.get_transaction(tx_hash).raw['logs']
    collector.add(f"logs_{step}", logs, "Swap fixed output second token in logs")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Add liquidity at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = add_liquidity_for_user(user)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Add liquidity")
    logs = context.network_provider.proxy.get_transaction(tx_hash).raw['logs']
    collector.add(f"logs_{step}", logs, "Add liquidity logs")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

chain_sim, collector, users = init_pre_update_test(initial_blocks)
user = users[user_index]

run_scenario()

################################################################################
chain_sim, users = init_post_upgrade_test(initial_blocks, chain_sim, collector)
user = users[user_index]

steps = run_scenario()

report_test_data(collector, False)

In [None]:
for i in range(7):
    print(collector.collections["before"][f"ops_{i}"])
    print(collector.collections["after"][f"ops_{i}"])
    print("-"*100)

negatives

In [None]:
show_in_browser = True

user_index = 0
initial_blocks = 15

def run_scenario():
    step = 1

    logger.info(f"Exit liquidity at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = remove_liquidity_for_user(user, 50, 101)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Exit liquidity ops")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Exit liquidity at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = remove_liquidity_for_user(user, 50, 50)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Exit liquidity ops")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Swap fixed input first token in at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = swap_fixed_input_for_user(user, pair_contract.firstToken, 50, 101)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Swap fixed input first token in")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Swap fixed input first token in at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = swap_fixed_input_for_user(user, pair_contract.firstToken, 0, 1)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Swap fixed input first token in")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Swap fixed input second token in at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = swap_fixed_input_for_user(user, pair_contract.secondToken, 50, 10000000000000000000000000)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Swap fixed input second token in")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Swap fixed output first token in at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = swap_fixed_output_for_user(user, pair_contract.firstToken, 50, 101)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Swap fixed output first token in")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Swap fixed output first token in at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = swap_fixed_output_for_user(user, pair_contract.firstToken, 50, 10000000000000000000000000)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Swap fixed output first token in")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Swap fixed output second token in at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = swap_fixed_output_for_user(user, pair_contract.secondToken, 50, 0)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Swap fixed output second token in")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Swap fixed output second token in at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = swap_fixed_output_for_user(user, pair_contract.secondToken, 0, 100)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Swap fixed output second token in")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

    logger.info(f"Add liquidity at block: {context.network_provider.proxy.get_network_status().block_nonce}")
    tx_hash = add_liquidity_for_user(user, 100, 101)
    chain_sim.advance_blocks(1)
    sleep(2)
    ops = context.network_provider.get_tx_operations(tx_hash, True)
    collector.add(f"ops_{step}", ops, "Add liquidity")
    open_tx_in_explorer(tx_hash, show_in_browser)

    collect_general_test_data(step, collector, pair_contract)
    step += 1

chain_sim, collector, users = init_pre_update_test(initial_blocks)
user = users[user_index]

run_scenario()

################################################################################
chain_sim, users = init_post_upgrade_test(initial_blocks, chain_sim, collector)
user = users[user_index]

steps = run_scenario()

report_test_data(collector, False)

In [None]:
pair_data_fetcher = PairContractDataFetcher(Address.new_from_bech32(pair_contract.address), context.network_provider.proxy.url)
estimates = pair_data_fetcher.get_data("getTokensForGivenPosition", [BigUIntValue(10000000)])
print(estimates)

abi.decode_custom_type("EsdtTokenPayment", bytes.fromhex(estimates[0])).amount

In [None]:
from pprint import pprint
pprint(context.network_provider.proxy.get_transaction("fc9d4129dfadb1a12637cbef4f086ac26d9378e2b797d826bb92d1470e9e9baa").raw['logs'])