From c36c1cbb9957bff4bb595130b977c80e4b48fd4b Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Thu, 4 Nov 2021 20:38:41 +0300 Subject: [PATCH 1/2] #265 fixes --- proxy/indexer/solana_receipts_update.py | 121 +++++++++--------------- 1 file changed, 43 insertions(+), 78 deletions(-) diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index e1de7858b..a590833b6 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -45,7 +45,7 @@ def __init__(self, signature, results, accounts = None): class TransactionStruct: - def __init__(self, eth_trx, eth_signature, from_address, got_result, signatures, storage, blocked_accounts): + def __init__(self, eth_trx, eth_signature, from_address, got_result, signatures, storage, blocked_accounts, slot): # logger.debug(eth_signature) self.eth_trx = eth_trx self.eth_signature = eth_signature @@ -54,6 +54,7 @@ def __init__(self, eth_trx, eth_signature, from_address, got_result, signatures, self.signatures = signatures self.storage = storage self.blocked_accounts = blocked_accounts + self.slot = slot class Indexer: @@ -271,6 +272,7 @@ def process_receipts(self): continue_result.signatures, storage_account, continue_result.accounts, + slot ) del continue_table[storage_account] @@ -327,7 +329,16 @@ def process_receipts(self): got_result = get_trx_results(trx) if got_result is not None: # self.submit_transaction(eth_trx, eth_signature, from_address, got_result, [signature]) - trx_table[eth_signature] = TransactionStruct(eth_trx, eth_signature, from_address, got_result, [signature], None, None) + trx_table[eth_signature] = TransactionStruct( + eth_trx, + eth_signature, + from_address, + got_result, + [signature], + None, + None, + slot + ) else: logger.error("RESULT NOT FOUND IN 05\n{}".format(json.dumps(trx, indent=4, sort_keys=True))) @@ -357,7 +368,8 @@ def process_receipts(self): None, [signature], storage_account, - blocked_accounts + blocked_accounts, + slot ) if storage_account in continue_table: @@ -369,11 +381,15 @@ def process_receipts(self): trx_table[eth_signature].signatures = continue_result.signatures del continue_table[storage_account] - elif instruction_data[0] == 0x0a: # Continue - # logger.debug("{:>10} {:>6} Continue 0x{}".format(slot, counter, instruction_data.hex())) + elif instruction_data[0] == 0x0a or instruction_data[0] == 0x14: # Continue or ContinueV02 storage_account = trx['transaction']['message']['accountKeys'][instruction['accounts'][0]] - blocked_accounts = [trx['transaction']['message']['accountKeys'][acc_idx] for acc_idx in instruction['accounts'][5:]] + if instruction_data[0] == 0x0a: + # logger.debug("{:>10} {:>6} Continue 0x{}".format(slot, counter, instruction_data.hex())) + blocked_accounts = [trx['transaction']['message']['accountKeys'][acc_idx] for acc_idx in instruction['accounts'][5:]] + if instruction_data[0] == 0x14: + # logger.debug("{:>10} {:>6} ContinueV02 0x{}".format(slot, counter, instruction_data.hex())) + blocked_accounts = [trx['transaction']['message']['accountKeys'][acc_idx] for acc_idx in instruction['accounts'][5:]] got_result = get_trx_results(trx) if storage_account in continue_table: @@ -406,7 +422,9 @@ def process_receipts(self): continue_table[storage_account].signatures.append(signature) if holder_account in holder_table: - holder_table[holder_account] = HolderStruct(storage_account) + if holder_table[holder_account].storage_account != storage_account: + logger.error("Strange behavior. Pay attention. STORAGE_ACCOUNT != STORAGE_ACCOUNT") + holder_table[holder_account] = HolderStruct(storage_account) else: holder_table[holder_account] = HolderStruct(storage_account) else: @@ -440,7 +458,16 @@ def process_receipts(self): if eth_signature in trx_table: trx_table[eth_signature].signatures.append(signature) else: - trx_table[eth_signature] = TransactionStruct(eth_trx, eth_signature, from_address, got_result, [signature], storage_account, blocked_accounts) + trx_table[eth_signature] = TransactionStruct( + eth_trx, + eth_signature, + from_address, + got_result, + [signature], + storage_account, + blocked_accounts, + slot + ) elif instruction_data[0] == 0x0e: # logger.debug("{:>10} {:>6} ExecuteTrxFromAccountDataIterativeOrContinue 0x{}".format(slot, counter, instruction_data.hex())) @@ -454,9 +481,11 @@ def process_receipts(self): continue_table[storage_account].signatures.append(signature) if holder_account in holder_table: - logger.error("Strange behavior. Pay attention. HOLDER ACCOUNT FOUND") - holder_table[holder_account] = HolderStruct(storage_account) + if holder_table[holder_account].storage_account != storage_account: + logger.error("Strange behavior. Pay attention. STORAGE_ACCOUNT != STORAGE_ACCOUNT") + holder_table[holder_account] = HolderStruct(storage_account) else: + logger.error("Strange behavior. Pay attention. HOLDER ACCOUNT NOT FOUND") holder_table[holder_account] = HolderStruct(storage_account) if got_result: @@ -467,67 +496,8 @@ def process_receipts(self): continue_table[storage_account].results = got_result else: - got_result = get_trx_results(trx) - if got_result is not None: - continue_table[storage_account] = ContinueStruct(signature, got_result, blocked_accounts) - holder_table[holder_account] = HolderStruct(storage_account) - else: - self.add_hunged_storage(trx, storage_account) - - elif instruction_data[0] == 0x13: # PartialCallFromRawEthereumTXv02 - # logger.debug("{:>10} {:>6} PartialCallFromRawEthereumTXv02 0x{}".format(slot, counter, instruction_data.hex())) - - storage_account = trx['transaction']['message']['accountKeys'][instruction['accounts'][0]] - - if storage_account in continue_table: - # collateral_pool_buf = instruction_data[1:5] - # step_count = instruction_data[5:13] - # from_addr = instruction_data[13:33] - - sign = instruction_data[33:98] - unsigned_msg = instruction_data[98:] - - (eth_trx, eth_signature, from_address) = get_trx_receipts(unsigned_msg, sign) - - continue_result = continue_table[storage_account] - - self.submit_transaction(eth_trx, eth_signature, from_address, continue_result.results, continue_result.signatures) - - del continue_table[storage_account] - else: - self.add_hunged_storage(trx, storage_account) - - elif instruction_data[0] == 0x14: # ContinueV02 - # logger.debug("{:>10} {:>6} ContinueV02 0x{}".format(slot, counter, instruction_data.hex())) - - storage_account = trx['transaction']['message']['accountKeys'][instruction['accounts'][0]] - - if storage_account in continue_table: - continue_table[storage_account].signatures.append(signature) - else: - got_result = get_trx_results(trx) - if got_result is not None: - continue_table[storage_account] = ContinueStruct(signature, got_result) - else: - self.add_hunged_storage(trx, storage_account) - - elif instruction_data[0] == 0x16: # ExecuteTrxFromAccountDataIterativeV02 - # logger.debug("{:>10} {:>6} ExecuteTrxFromAccountDataIterativeV02 0x{}".format(slot, counter, instruction_data.hex())) - - holder_account = trx['transaction']['message']['accountKeys'][instruction['accounts'][0]] - storage_account = trx['transaction']['message']['accountKeys'][instruction['accounts'][1]] - - if storage_account in continue_table: - continue_table[storage_account].signatures.append(signature) - - if holder_account in holder_table: - # logger.debug("holder_account found") - # logger.debug("Strange behavior. Pay attention.") - holder_table[holder_account] = HolderStruct(storage_account) - else: - holder_table[holder_account] = HolderStruct(storage_account) - else: - self.add_hunged_storage(trx, storage_account) + continue_table[storage_account] = ContinueStruct(signature, got_result, blocked_accounts) + holder_table[holder_account] = HolderStruct(storage_account) if instruction_data[0] > 0x16: logger.debug("{:>10} {:>6} Unknown 0x{}".format(slot, counter, instruction_data.hex())) @@ -538,12 +508,12 @@ def process_receipts(self): if trx_struct.got_result: self.submit_transaction(trx_struct) elif trx_struct.storage: - self.blocked_storages[trx_struct.storage] = (trx_struct.eth_trx, trx_struct.blocked_accounts) + if abs(trx_struct.slot - self.current_slot) > 16: + self.blocked_storages[trx_struct.storage] = (trx_struct.eth_trx, trx_struct.blocked_accounts) else: logger.error(trx_struct) - def submit_transaction(self, trx_struct): (logs, status, gas_used, return_value, slot) = trx_struct.got_result (_slot, block_hash) = self.get_block(slot) @@ -605,11 +575,6 @@ def get_block(self, slot): return (slot, block_hash) - def add_hunged_storage(self, trx, storage): - if abs(trx['slot'] - self.current_slot) > 16: - self.blocked_storages.add(storage) - - def run_indexer(): logging.basicConfig(format='%(asctime)s - pid:%(process)d [%(levelname)-.1s] %(funcName)s:%(lineno)d - %(message)s') logger.setLevel(logging.DEBUG) From dec348345800a71f069e42e93cf2a2c19072bf99 Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Thu, 4 Nov 2021 21:10:57 +0300 Subject: [PATCH 2/2] #265 config defined timeout --- proxy/indexer/solana_receipts_update.py | 3 ++- proxy/run-proxy.sh | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index a590833b6..90c3e4c5a 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -19,6 +19,7 @@ solana_url = os.environ.get("SOLANA_URL", "https://api.devnet.solana.com") evm_loader_id = os.environ.get("EVM_LOADER", "eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU") PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2")) +CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60")) logger = logging.getLogger(__name__) @@ -508,7 +509,7 @@ def process_receipts(self): if trx_struct.got_result: self.submit_transaction(trx_struct) elif trx_struct.storage: - if abs(trx_struct.slot - self.current_slot) > 16: + if abs(trx_struct.slot - self.current_slot) > CANCEL_TIMEOUT: self.blocked_storages[trx_struct.storage] = (trx_struct.eth_trx, trx_struct.blocked_accounts) else: logger.error(trx_struct) diff --git a/proxy/run-proxy.sh b/proxy/run-proxy.sh index f1723d498..8ed2ba215 100755 --- a/proxy/run-proxy.sh +++ b/proxy/run-proxy.sh @@ -16,6 +16,7 @@ if [ "$CONFIG" == "ci" ]; then [[ -z "$USE_COMBINED_START_CONTINUE" ]] && export USE_COMBINED_START_CONTINUE="YES" [[ -z "$CONTINUE_COUNT_FACTOR" ]] && export CONTINUE_COUNT_FACTOR="3" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0 + [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10 elif [ "$CONFIG" == "local" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="http://localhost:8899" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=deploy @@ -26,6 +27,7 @@ elif [ "$CONFIG" == "local" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=0 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="0.9" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0 + [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10 elif [ "$CONFIG" == "devnet" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.devnet.solana.com" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU @@ -36,6 +38,7 @@ elif [ "$CONFIG" == "devnet" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="10" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=1 + [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60 elif [ "$CONFIG" == "testnet" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.testnet.solana.com" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU @@ -46,6 +49,7 @@ elif [ "$CONFIG" == "testnet" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="15" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE="1" + [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60 elif [ "$CONFIG" != "custom" ]; then exit 1 fi