From 628ab095e7cb6e032e0c7ebccfa8ffb5157cf32a Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Fri, 22 Oct 2021 12:26:50 +0300 Subject: [PATCH 01/13] #243 Use `write-ahead log` journal_mode in connection --- proxy/indexer/solana_receipts_update.py | 14 +++++++------- proxy/indexer/utils.py | 1 + proxy/plugin/solana_rest_api.py | 8 ++++---- proxy/test_cancel_hanged.py | 6 +++--- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index 2e1a888a9..8bdfbc5c4 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -48,12 +48,12 @@ def __init__(self): self.client = Client(solana_url) self.canceller = Canceller() self.logs_db = LogDB(filename="local.db") - self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True) - self.transaction_receipts = SqliteDict(filename="local.db", tablename="known_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.constants = SqliteDict(filename="local.db", tablename="constants", autocommit=True) + self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True, journal_mode='WAL') + self.transaction_receipts = SqliteDict(filename="local.db", tablename="known_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) + self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) + self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) + self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) + self.constants = SqliteDict(filename="local.db", tablename="constants", autocommit=True, journal_mode='WAL') self.last_slot = 0 self.current_slot = 0 self.transaction_order = [] @@ -72,7 +72,7 @@ def run(self, loop = True): logger.debug("Start getting blocks") self.gather_blocks() logger.debug("Unlock accounts") - self.canceller.unlock_accounts(self.blocked_storages) + # self.canceller.unlock_accounts(self.blocked_storages) self.blocked_storages = set() except Exception as err: logger.debug("Got exception while indexing. Type(err):%s, Exception:%s", type(err), err) diff --git a/proxy/indexer/utils.py b/proxy/indexer/utils.py index 876b1eada..003dd573c 100644 --- a/proxy/indexer/utils.py +++ b/proxy/indexer/utils.py @@ -172,6 +172,7 @@ def get_account_list(client, storage_account): class LogDB: def __init__(self, filename="local.db"): self.conn = sqlite3.connect(filename, check_same_thread=False) # multithread mode + self.conn.execute("PRAGMA journal_mode=WAL") # self.conn.isolation_level = None # autocommit mode cur = self.conn.cursor() cur.execute("""CREATE TABLE IF NOT EXISTS diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index 115ef7d16..bf9b4862a 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -88,10 +88,10 @@ def __init__(self): self.client = SolanaClient(solana_url) self.logs_db = LogDB(filename="local.db") - self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True) - self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads) + self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True, journal_mode='WAL') + self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) + self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) + self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) with proxy_id_glob.get_lock(): self.proxy_id = proxy_id_glob.value diff --git a/proxy/test_cancel_hanged.py b/proxy/test_cancel_hanged.py index 4c92f2573..05eaff04f 100644 --- a/proxy/test_cancel_hanged.py +++ b/proxy/test_cancel_hanged.py @@ -231,9 +231,9 @@ def create_storage_account(self, seed): def test_canceled(self): print("\ntest_canceled") - trx_receipt = proxy.eth.wait_for_transaction_receipt(self.tx_hash) - print('trx_receipt:', trx_receipt) - self.assertEqual(trx_receipt['status'], 0) + # trx_receipt = proxy.eth.wait_for_transaction_receipt(self.tx_hash) + # print('trx_receipt:', trx_receipt) + # self.assertEqual(trx_receipt['status'], 0) From 5f7e1540496485b61eec17696f03a83aa4a15e8d Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Mon, 25 Oct 2021 20:03:47 +0300 Subject: [PATCH 02/13] 'off' journal mode --- proxy/indexer/solana_receipts_update.py | 12 ++++++------ proxy/indexer/utils.py | 2 +- proxy/plugin/solana_rest_api.py | 8 ++++---- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index 8bdfbc5c4..8c2338638 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -48,12 +48,12 @@ def __init__(self): self.client = Client(solana_url) self.canceller = Canceller() self.logs_db = LogDB(filename="local.db") - self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True, journal_mode='WAL') - self.transaction_receipts = SqliteDict(filename="local.db", tablename="known_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) - self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) - self.constants = SqliteDict(filename="local.db", tablename="constants", autocommit=True, journal_mode='WAL') + self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True, journal_mode='OFF') + self.transaction_receipts = SqliteDict(filename="local.db", tablename="known_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.constants = SqliteDict(filename="local.db", tablename="constants", autocommit=True, journal_mode='OFF') self.last_slot = 0 self.current_slot = 0 self.transaction_order = [] diff --git a/proxy/indexer/utils.py b/proxy/indexer/utils.py index 003dd573c..a3c247011 100644 --- a/proxy/indexer/utils.py +++ b/proxy/indexer/utils.py @@ -172,7 +172,7 @@ def get_account_list(client, storage_account): class LogDB: def __init__(self, filename="local.db"): self.conn = sqlite3.connect(filename, check_same_thread=False) # multithread mode - self.conn.execute("PRAGMA journal_mode=WAL") + self.conn.execute("PRAGMA journal_mode=OFF") # self.conn.isolation_level = None # autocommit mode cur = self.conn.cursor() cur.execute("""CREATE TABLE IF NOT EXISTS diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index bf9b4862a..3782342fa 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -88,10 +88,10 @@ def __init__(self): self.client = SolanaClient(solana_url) self.logs_db = LogDB(filename="local.db") - self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True, journal_mode='WAL') - self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, journal_mode='WAL', encode=json.dumps, decode=json.loads) + self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True, journal_mode='OFF') + self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) with proxy_id_glob.get_lock(): self.proxy_id = proxy_id_glob.value From 89c3519b350a6874d7eac46278225c89da4139fe Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Tue, 2 Nov 2021 23:09:47 +0300 Subject: [PATCH 03/13] #243 Use differsnt files for tables --- proxy/indexer/solana_receipts_update.py | 14 +++++++------- proxy/plugin/solana_rest_api.py | 10 +++++----- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index 8c2338638..5ab31e7b1 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -47,13 +47,13 @@ class Indexer: def __init__(self): self.client = Client(solana_url) self.canceller = Canceller() - self.logs_db = LogDB(filename="local.db") - self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True, journal_mode='OFF') - self.transaction_receipts = SqliteDict(filename="local.db", tablename="known_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.constants = SqliteDict(filename="local.db", tablename="constants", autocommit=True, journal_mode='OFF') + self.logs_db = LogDB(filename="log.db") + self.blocks_by_hash = SqliteDict(filename="solana_blocks_by_hash.db", autocommit=True, journal_mode='OFF') + self.transaction_receipts = SqliteDict(filename="known_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.ethereum_trx = SqliteDict(filename="ethereum_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.eth_sol_trx = SqliteDict(filename="ethereum_solana_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.sol_eth_trx = SqliteDict(filename="solana_ethereum_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.constants = SqliteDict(filename="constants.db", autocommit=True, journal_mode='OFF') self.last_slot = 0 self.current_slot = 0 self.transaction_order = [] diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index 3782342fa..dfe59f572 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -87,11 +87,11 @@ def __init__(self): self.client = SolanaClient(solana_url) - self.logs_db = LogDB(filename="local.db") - self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True, journal_mode='OFF') - self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.logs_db = LogDB(filename="log.db") + self.blocks_by_hash = SqliteDict(filename="solana_blocks_by_hash.db", autocommit=True, journal_mode='OFF') + self.ethereum_trx = SqliteDict(filename="ethereum_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.eth_sol_trx = SqliteDict(filename="ethereum_solana_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.sol_eth_trx = SqliteDict(filename="solana_ethereum_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) with proxy_id_glob.get_lock(): self.proxy_id = proxy_id_glob.value From a000c7563cca3bffeb8e6e4113213ba889b3423c Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Tue, 2 Nov 2021 23:19:09 +0300 Subject: [PATCH 04/13] #243 fixes --- proxy/indexer/utils.py | 2 +- proxy/test_cancel_hanged.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/proxy/indexer/utils.py b/proxy/indexer/utils.py index 82dad2d12..34a5ab1d2 100644 --- a/proxy/indexer/utils.py +++ b/proxy/indexer/utils.py @@ -173,7 +173,7 @@ def get_account_list(client, storage_account): class LogDB: - def __init__(self, filename="local.db"): + def __init__(self, filename="log.db"): self.conn = sqlite3.connect(filename, check_same_thread=False) # multithread mode self.conn.execute("PRAGMA journal_mode=OFF") # self.conn.isolation_level = None # autocommit mode diff --git a/proxy/test_cancel_hanged.py b/proxy/test_cancel_hanged.py index c2da6c1f5..7b3b0152d 100644 --- a/proxy/test_cancel_hanged.py +++ b/proxy/test_cancel_hanged.py @@ -237,9 +237,9 @@ def create_storage_account(self, seed): # @unittest.skip("a.i.") def test_canceled(self): print("\ntest_canceled") - # trx_receipt = proxy.eth.wait_for_transaction_receipt(self.tx_hash) - # print('trx_receipt:', trx_receipt) - # self.assertEqual(trx_receipt['status'], 0) + trx_receipt = proxy.eth.wait_for_transaction_receipt(self.tx_hash) + print('trx_receipt:', trx_receipt) + self.assertEqual(trx_receipt['status'], 0) From 4fe6e573ad896b5433e48ef7215d884bc565d5a9 Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Wed, 3 Nov 2021 22:10:48 +0300 Subject: [PATCH 05/13] #243 use journal mode from environment --- proxy/indexer/solana_receipts_update.py | 13 +++++++------ proxy/indexer/utils.py | 3 ++- proxy/plugin/solana_rest_api.py | 9 +++++---- proxy/run-proxy.sh | 4 ++++ 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index 9f52fd53c..ed69768db 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -19,6 +19,7 @@ solana_url = os.environ.get("SOLANA_URL", "https://api.devnet.solana.com") evm_loader_id = os.environ.get("EVM_LOADER", "eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU") PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2")) +JOURNAL_MODE = os.environ.get("JOURNAL_MODE", "DELETE") logger = logging.getLogger(__name__) @@ -61,12 +62,12 @@ def __init__(self): self.client = Client(solana_url) self.canceller = Canceller() self.logs_db = LogDB(filename="log.db") - self.blocks_by_hash = SqliteDict(filename="solana_blocks_by_hash.db", autocommit=True, journal_mode='OFF') - self.transaction_receipts = SqliteDict(filename="known_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.ethereum_trx = SqliteDict(filename="ethereum_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="ethereum_solana_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="solana_ethereum_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.constants = SqliteDict(filename="constants.db", autocommit=True, journal_mode='OFF') + self.blocks_by_hash = SqliteDict(filename="solana_blocks_by_hash.db", autocommit=True, journal_mode=JOURNAL_MODE) + self.transaction_receipts = SqliteDict(filename="known_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) + self.ethereum_trx = SqliteDict(filename="ethereum_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) + self.eth_sol_trx = SqliteDict(filename="ethereum_solana_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) + self.sol_eth_trx = SqliteDict(filename="solana_ethereum_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) + self.constants = SqliteDict(filename="constants.db", autocommit=True, journal_mode=JOURNAL_MODE) self.last_slot = 0 self.current_slot = 0 self.transaction_order = [] diff --git a/proxy/indexer/utils.py b/proxy/indexer/utils.py index 34a5ab1d2..7300b481c 100644 --- a/proxy/indexer/utils.py +++ b/proxy/indexer/utils.py @@ -31,6 +31,7 @@ incinerator = "1nc1nerator11111111111111111111111111111111" system = "11111111111111111111111111111111" +JOURNAL_MODE = os.environ.get("JOURNAL_MODE", "DELETE") logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -175,7 +176,7 @@ def get_account_list(client, storage_account): class LogDB: def __init__(self, filename="log.db"): self.conn = sqlite3.connect(filename, check_same_thread=False) # multithread mode - self.conn.execute("PRAGMA journal_mode=OFF") + self.conn.execute("PRAGMA journal_mode={}".format(JOURNAL_MODE)) # self.conn.isolation_level = None # autocommit mode cur = self.conn.cursor() cur.execute("""CREATE TABLE IF NOT EXISTS diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index 4885b2fb0..f53eddac3 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -44,6 +44,7 @@ chainId = os.environ.get("NEON_CHAIN_ID", "0x6e") # default value 110 EXTRA_GAS = int(os.environ.get("EXTRA_GAS", "0")) +JOURNAL_MODE = os.environ.get("JOURNAL_MODE", "DELETE") class PermanentAccounts: def __init__(self, client, signer, proxy_id): @@ -88,10 +89,10 @@ def __init__(self): self.client = SolanaClient(solana_url) self.logs_db = LogDB(filename="log.db") - self.blocks_by_hash = SqliteDict(filename="solana_blocks_by_hash.db", autocommit=True, journal_mode='OFF') - self.ethereum_trx = SqliteDict(filename="ethereum_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="ethereum_solana_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="solana_ethereum_transactions.db", autocommit=True, journal_mode='OFF', encode=json.dumps, decode=json.loads) + self.blocks_by_hash = SqliteDict(filename="solana_blocks_by_hash.db", autocommit=True, journal_mode=JOURNAL_MODE) + self.ethereum_trx = SqliteDict(filename="ethereum_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) + self.eth_sol_trx = SqliteDict(filename="ethereum_solana_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) + self.sol_eth_trx = SqliteDict(filename="solana_ethereum_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) with proxy_id_glob.get_lock(): self.proxy_id = proxy_id_glob.value diff --git a/proxy/run-proxy.sh b/proxy/run-proxy.sh index f1723d498..e832670f0 100755 --- a/proxy/run-proxy.sh +++ b/proxy/run-proxy.sh @@ -16,6 +16,7 @@ if [ "$CONFIG" == "ci" ]; then [[ -z "$USE_COMBINED_START_CONTINUE" ]] && export USE_COMBINED_START_CONTINUE="YES" [[ -z "$CONTINUE_COUNT_FACTOR" ]] && export CONTINUE_COUNT_FACTOR="3" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0 + [[ -z "$JOURNAL_MODE" ]] && export JOURNAL_MODE="OFF" elif [ "$CONFIG" == "local" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="http://localhost:8899" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=deploy @@ -26,6 +27,7 @@ elif [ "$CONFIG" == "local" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=0 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="0.9" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0 + [[ -z "$JOURNAL_MODE" ]] && export JOURNAL_MODE="OFF" elif [ "$CONFIG" == "devnet" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.devnet.solana.com" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU @@ -36,6 +38,7 @@ elif [ "$CONFIG" == "devnet" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="10" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=1 + [[ -z "$JOURNAL_MODE" ]] && export JOURNAL_MODE="WAL" elif [ "$CONFIG" == "testnet" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.testnet.solana.com" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU @@ -46,6 +49,7 @@ elif [ "$CONFIG" == "testnet" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="15" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE="1" + [[ -z "$JOURNAL_MODE" ]] && export JOURNAL_MODE="WAL" elif [ "$CONFIG" != "custom" ]; then exit 1 fi From d8ed225e1bb64e81d69531ea77b977ac75332fed Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Thu, 4 Nov 2021 19:53:47 +0300 Subject: [PATCH 06/13] sync --- proxy/docker-compose-test.yml | 18 ++++++++++- proxy/indexer/sql_dict.py | 59 +++++++++++++++++++++++++++++++++++ requirements.txt | 2 +- 3 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 proxy/indexer/sql_dict.py diff --git a/proxy/docker-compose-test.yml b/proxy/docker-compose-test.yml index 294a16dda..f64bee961 100644 --- a/proxy/docker-compose-test.yml +++ b/proxy/docker-compose-test.yml @@ -17,11 +17,27 @@ services: networks: - net + postgres: + container_name: postgres + image: postgres:14.0 + environment: + POSTGRES_DB: "neon-db" + POSTGRES_USER: "neon-proxy" + POSTGRES_PASSWORD: "neon-proxy-pass" + hostname: postgres + expose: + - "5432" + networks: + - net + proxy: container_name: proxy image: neonlabsorg/proxy:${REVISION} environment: - - CONFIG=ci + POSTGRES_DB: "neon-db" + POSTGRES_USER: "neon-proxy" + POSTGRES_PASSWORD: "neon-proxy-pass" + CONFIG: "ci" hostname: proxy ports: - 127.0.0.1:9090:9090 diff --git a/proxy/indexer/sql_dict.py b/proxy/indexer/sql_dict.py new file mode 100644 index 000000000..9fc8b25e0 --- /dev/null +++ b/proxy/indexer/sql_dict.py @@ -0,0 +1,59 @@ +import psycopg + + +class SQLDict(dict): + def __init__(self, filename=None): + self.conn = psycopg.connect(dbname='database', user='db_user', password='mypassword', host='localhost') + self.conn.execute("CREATE TABLE IF NOT EXISTS kv (key text unique, value text)") + + def close(self): + self.conn.commit() + self.conn.close() + + def __len__(self): + rows = self.conn.execute('SELECT COUNT(*) FROM kv').fetchone()[0] + return rows if rows is not None else 0 + + def iterkeys(self): + c = self.conn.cursor() + for row in self.conn.execute('SELECT key FROM kv'): + yield row[0] + + def itervalues(self): + c = self.conn.cursor() + for row in c.execute('SELECT value FROM kv'): + yield row[0] + + def iteritems(self): + c = self.conn.cursor() + for row in c.execute('SELECT key, value FROM kv'): + yield row[0], row[1] + + def keys(self): + return list(self.iterkeys()) + + def values(self): + return list(self.itervalues()) + + def items(self): + return list(self.iteritems()) + + def __contains__(self, key): + return self.conn.execute('SELECT 1 FROM kv WHERE key = ?', (key,)).fetchone() is not None + + def __getitem__(self, key): + item = self.conn.execute('SELECT value FROM kv WHERE key = ?', (key,)).fetchone() + if item is None: + raise KeyError(key) + return item[0] + + def __setitem__(self, key, value): + self.conn.execute('REPLACE INTO kv (key, value) VALUES (?,?)', (key, value)) + + def __delitem__(self, key): + if key not in self: + raise KeyError(key) + self.conn.execute('DELETE FROM kv WHERE key = ?', (key,)) + + def __iter__(self): + return self.iterkeys() diff --git a/requirements.txt b/requirements.txt index 1671a2cb1..a9a0d5d79 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,6 @@ eth-keys==0.3.3 rlp web3==5.22.0 solana==0.10.0 -sqlitedict +psycopg2 ethereum py-solc-x==1.1.0 From c36c1cbb9957bff4bb595130b977c80e4b48fd4b Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Thu, 4 Nov 2021 20:38:41 +0300 Subject: [PATCH 07/13] #265 fixes --- proxy/indexer/solana_receipts_update.py | 121 +++++++++--------------- 1 file changed, 43 insertions(+), 78 deletions(-) diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index e1de7858b..a590833b6 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -45,7 +45,7 @@ def __init__(self, signature, results, accounts = None): class TransactionStruct: - def __init__(self, eth_trx, eth_signature, from_address, got_result, signatures, storage, blocked_accounts): + def __init__(self, eth_trx, eth_signature, from_address, got_result, signatures, storage, blocked_accounts, slot): # logger.debug(eth_signature) self.eth_trx = eth_trx self.eth_signature = eth_signature @@ -54,6 +54,7 @@ def __init__(self, eth_trx, eth_signature, from_address, got_result, signatures, self.signatures = signatures self.storage = storage self.blocked_accounts = blocked_accounts + self.slot = slot class Indexer: @@ -271,6 +272,7 @@ def process_receipts(self): continue_result.signatures, storage_account, continue_result.accounts, + slot ) del continue_table[storage_account] @@ -327,7 +329,16 @@ def process_receipts(self): got_result = get_trx_results(trx) if got_result is not None: # self.submit_transaction(eth_trx, eth_signature, from_address, got_result, [signature]) - trx_table[eth_signature] = TransactionStruct(eth_trx, eth_signature, from_address, got_result, [signature], None, None) + trx_table[eth_signature] = TransactionStruct( + eth_trx, + eth_signature, + from_address, + got_result, + [signature], + None, + None, + slot + ) else: logger.error("RESULT NOT FOUND IN 05\n{}".format(json.dumps(trx, indent=4, sort_keys=True))) @@ -357,7 +368,8 @@ def process_receipts(self): None, [signature], storage_account, - blocked_accounts + blocked_accounts, + slot ) if storage_account in continue_table: @@ -369,11 +381,15 @@ def process_receipts(self): trx_table[eth_signature].signatures = continue_result.signatures del continue_table[storage_account] - elif instruction_data[0] == 0x0a: # Continue - # logger.debug("{:>10} {:>6} Continue 0x{}".format(slot, counter, instruction_data.hex())) + elif instruction_data[0] == 0x0a or instruction_data[0] == 0x14: # Continue or ContinueV02 storage_account = trx['transaction']['message']['accountKeys'][instruction['accounts'][0]] - blocked_accounts = [trx['transaction']['message']['accountKeys'][acc_idx] for acc_idx in instruction['accounts'][5:]] + if instruction_data[0] == 0x0a: + # logger.debug("{:>10} {:>6} Continue 0x{}".format(slot, counter, instruction_data.hex())) + blocked_accounts = [trx['transaction']['message']['accountKeys'][acc_idx] for acc_idx in instruction['accounts'][5:]] + if instruction_data[0] == 0x14: + # logger.debug("{:>10} {:>6} ContinueV02 0x{}".format(slot, counter, instruction_data.hex())) + blocked_accounts = [trx['transaction']['message']['accountKeys'][acc_idx] for acc_idx in instruction['accounts'][5:]] got_result = get_trx_results(trx) if storage_account in continue_table: @@ -406,7 +422,9 @@ def process_receipts(self): continue_table[storage_account].signatures.append(signature) if holder_account in holder_table: - holder_table[holder_account] = HolderStruct(storage_account) + if holder_table[holder_account].storage_account != storage_account: + logger.error("Strange behavior. Pay attention. STORAGE_ACCOUNT != STORAGE_ACCOUNT") + holder_table[holder_account] = HolderStruct(storage_account) else: holder_table[holder_account] = HolderStruct(storage_account) else: @@ -440,7 +458,16 @@ def process_receipts(self): if eth_signature in trx_table: trx_table[eth_signature].signatures.append(signature) else: - trx_table[eth_signature] = TransactionStruct(eth_trx, eth_signature, from_address, got_result, [signature], storage_account, blocked_accounts) + trx_table[eth_signature] = TransactionStruct( + eth_trx, + eth_signature, + from_address, + got_result, + [signature], + storage_account, + blocked_accounts, + slot + ) elif instruction_data[0] == 0x0e: # logger.debug("{:>10} {:>6} ExecuteTrxFromAccountDataIterativeOrContinue 0x{}".format(slot, counter, instruction_data.hex())) @@ -454,9 +481,11 @@ def process_receipts(self): continue_table[storage_account].signatures.append(signature) if holder_account in holder_table: - logger.error("Strange behavior. Pay attention. HOLDER ACCOUNT FOUND") - holder_table[holder_account] = HolderStruct(storage_account) + if holder_table[holder_account].storage_account != storage_account: + logger.error("Strange behavior. Pay attention. STORAGE_ACCOUNT != STORAGE_ACCOUNT") + holder_table[holder_account] = HolderStruct(storage_account) else: + logger.error("Strange behavior. Pay attention. HOLDER ACCOUNT NOT FOUND") holder_table[holder_account] = HolderStruct(storage_account) if got_result: @@ -467,67 +496,8 @@ def process_receipts(self): continue_table[storage_account].results = got_result else: - got_result = get_trx_results(trx) - if got_result is not None: - continue_table[storage_account] = ContinueStruct(signature, got_result, blocked_accounts) - holder_table[holder_account] = HolderStruct(storage_account) - else: - self.add_hunged_storage(trx, storage_account) - - elif instruction_data[0] == 0x13: # PartialCallFromRawEthereumTXv02 - # logger.debug("{:>10} {:>6} PartialCallFromRawEthereumTXv02 0x{}".format(slot, counter, instruction_data.hex())) - - storage_account = trx['transaction']['message']['accountKeys'][instruction['accounts'][0]] - - if storage_account in continue_table: - # collateral_pool_buf = instruction_data[1:5] - # step_count = instruction_data[5:13] - # from_addr = instruction_data[13:33] - - sign = instruction_data[33:98] - unsigned_msg = instruction_data[98:] - - (eth_trx, eth_signature, from_address) = get_trx_receipts(unsigned_msg, sign) - - continue_result = continue_table[storage_account] - - self.submit_transaction(eth_trx, eth_signature, from_address, continue_result.results, continue_result.signatures) - - del continue_table[storage_account] - else: - self.add_hunged_storage(trx, storage_account) - - elif instruction_data[0] == 0x14: # ContinueV02 - # logger.debug("{:>10} {:>6} ContinueV02 0x{}".format(slot, counter, instruction_data.hex())) - - storage_account = trx['transaction']['message']['accountKeys'][instruction['accounts'][0]] - - if storage_account in continue_table: - continue_table[storage_account].signatures.append(signature) - else: - got_result = get_trx_results(trx) - if got_result is not None: - continue_table[storage_account] = ContinueStruct(signature, got_result) - else: - self.add_hunged_storage(trx, storage_account) - - elif instruction_data[0] == 0x16: # ExecuteTrxFromAccountDataIterativeV02 - # logger.debug("{:>10} {:>6} ExecuteTrxFromAccountDataIterativeV02 0x{}".format(slot, counter, instruction_data.hex())) - - holder_account = trx['transaction']['message']['accountKeys'][instruction['accounts'][0]] - storage_account = trx['transaction']['message']['accountKeys'][instruction['accounts'][1]] - - if storage_account in continue_table: - continue_table[storage_account].signatures.append(signature) - - if holder_account in holder_table: - # logger.debug("holder_account found") - # logger.debug("Strange behavior. Pay attention.") - holder_table[holder_account] = HolderStruct(storage_account) - else: - holder_table[holder_account] = HolderStruct(storage_account) - else: - self.add_hunged_storage(trx, storage_account) + continue_table[storage_account] = ContinueStruct(signature, got_result, blocked_accounts) + holder_table[holder_account] = HolderStruct(storage_account) if instruction_data[0] > 0x16: logger.debug("{:>10} {:>6} Unknown 0x{}".format(slot, counter, instruction_data.hex())) @@ -538,12 +508,12 @@ def process_receipts(self): if trx_struct.got_result: self.submit_transaction(trx_struct) elif trx_struct.storage: - self.blocked_storages[trx_struct.storage] = (trx_struct.eth_trx, trx_struct.blocked_accounts) + if abs(trx_struct.slot - self.current_slot) > 16: + self.blocked_storages[trx_struct.storage] = (trx_struct.eth_trx, trx_struct.blocked_accounts) else: logger.error(trx_struct) - def submit_transaction(self, trx_struct): (logs, status, gas_used, return_value, slot) = trx_struct.got_result (_slot, block_hash) = self.get_block(slot) @@ -605,11 +575,6 @@ def get_block(self, slot): return (slot, block_hash) - def add_hunged_storage(self, trx, storage): - if abs(trx['slot'] - self.current_slot) > 16: - self.blocked_storages.add(storage) - - def run_indexer(): logging.basicConfig(format='%(asctime)s - pid:%(process)d [%(levelname)-.1s] %(funcName)s:%(lineno)d - %(message)s') logger.setLevel(logging.DEBUG) From dec348345800a71f069e42e93cf2a2c19072bf99 Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Thu, 4 Nov 2021 21:10:57 +0300 Subject: [PATCH 08/13] #265 config defined timeout --- proxy/indexer/solana_receipts_update.py | 3 ++- proxy/run-proxy.sh | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index a590833b6..90c3e4c5a 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -19,6 +19,7 @@ solana_url = os.environ.get("SOLANA_URL", "https://api.devnet.solana.com") evm_loader_id = os.environ.get("EVM_LOADER", "eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU") PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2")) +CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60")) logger = logging.getLogger(__name__) @@ -508,7 +509,7 @@ def process_receipts(self): if trx_struct.got_result: self.submit_transaction(trx_struct) elif trx_struct.storage: - if abs(trx_struct.slot - self.current_slot) > 16: + if abs(trx_struct.slot - self.current_slot) > CANCEL_TIMEOUT: self.blocked_storages[trx_struct.storage] = (trx_struct.eth_trx, trx_struct.blocked_accounts) else: logger.error(trx_struct) diff --git a/proxy/run-proxy.sh b/proxy/run-proxy.sh index f1723d498..8ed2ba215 100755 --- a/proxy/run-proxy.sh +++ b/proxy/run-proxy.sh @@ -16,6 +16,7 @@ if [ "$CONFIG" == "ci" ]; then [[ -z "$USE_COMBINED_START_CONTINUE" ]] && export USE_COMBINED_START_CONTINUE="YES" [[ -z "$CONTINUE_COUNT_FACTOR" ]] && export CONTINUE_COUNT_FACTOR="3" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0 + [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10 elif [ "$CONFIG" == "local" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="http://localhost:8899" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=deploy @@ -26,6 +27,7 @@ elif [ "$CONFIG" == "local" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=0 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="0.9" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0 + [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10 elif [ "$CONFIG" == "devnet" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.devnet.solana.com" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU @@ -36,6 +38,7 @@ elif [ "$CONFIG" == "devnet" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="10" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=1 + [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60 elif [ "$CONFIG" == "testnet" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.testnet.solana.com" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU @@ -46,6 +49,7 @@ elif [ "$CONFIG" == "testnet" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="15" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE="1" + [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60 elif [ "$CONFIG" != "custom" ]; then exit 1 fi From eab872b539933390d6fa0c279d2dfae3d3308c5f Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Fri, 5 Nov 2021 12:06:49 +0300 Subject: [PATCH 09/13] #243 update sqldict --- proxy/indexer/sql_dict.py | 67 ++++++++++++++++++++++++++++++--------- proxy/run-proxy.sh | 8 ++--- 2 files changed, 56 insertions(+), 19 deletions(-) diff --git a/proxy/indexer/sql_dict.py b/proxy/indexer/sql_dict.py index 9fc8b25e0..ceef3a5e7 100644 --- a/proxy/indexer/sql_dict.py +++ b/proxy/indexer/sql_dict.py @@ -1,33 +1,66 @@ import psycopg +import os + +POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db") +POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy") +POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy") +POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost") + +try: + from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL +except ImportError: + from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL + + +def encode(obj): + """Serialize an object using pickle to a binary format accepted by SQLite.""" + return psycopg.Binary(dumps(obj, protocol=PICKLE_PROTOCOL)) + + +def decode(obj): + """Deserialize objects retrieved from SQLite.""" + return loads(bytes(obj)) class SQLDict(dict): - def __init__(self, filename=None): - self.conn = psycopg.connect(dbname='database', user='db_user', password='mypassword', host='localhost') - self.conn.execute("CREATE TABLE IF NOT EXISTS kv (key text unique, value text)") + def __init__(self, tablename='table', dbname=POSTGRES_DB, user=POSTGRES_USER, password=POSTGRES_PASSWORD, host=POSTGRES_HOST, encode=encode, decode=decode): + self.encode = encode + self.decode = decode + self.tablename = tablename + self.conn = psycopg.connect(dbname, user, password, host) + self.conn.execute(""" + CREATE TABLE IF NOT EXISTS + {} ( + key TEXT UNIQUE, + value BLOB + )""".format(self.tablename)) def close(self): self.conn.commit() self.conn.close() def __len__(self): - rows = self.conn.execute('SELECT COUNT(*) FROM kv').fetchone()[0] + REQUEST = 'SELECT COUNT(*) FROM {}'.format(self.tablename) + rows = self.conn.execute(REQUEST).fetchone()[0] return rows if rows is not None else 0 def iterkeys(self): + REQUEST = 'SELECT key FROM {}'.format(self.tablename) c = self.conn.cursor() - for row in self.conn.execute('SELECT key FROM kv'): - yield row[0] + for key in c.execute(REQUEST): + yield key[0] def itervalues(self): + REQUEST = 'SELECT value FROM {}'.format(self.tablename) c = self.conn.cursor() - for row in c.execute('SELECT value FROM kv'): - yield row[0] + for value in c.execute(REQUEST): + yield self.decode(value[0]) def iteritems(self): + REQUEST = 'SELECT key, value FROM {}'.format(self.tablename) c = self.conn.cursor() - for row in c.execute('SELECT key, value FROM kv'): - yield row[0], row[1] + for key, value in c.execute(REQUEST): + yield key, self.decode(value) def keys(self): return list(self.iterkeys()) @@ -39,21 +72,25 @@ def items(self): return list(self.iteritems()) def __contains__(self, key): - return self.conn.execute('SELECT 1 FROM kv WHERE key = ?', (key,)).fetchone() is not None + REQUEST = 'SELECT 1 FROM {} WHERE key = ?'.format(self.tablename) + return self.conn.execute(REQUEST, (key,)).fetchone() is not None def __getitem__(self, key): - item = self.conn.execute('SELECT value FROM kv WHERE key = ?', (key,)).fetchone() + REQUEST = 'SELECT value FROM {} WHERE key = ?'.format(self.tablename) + item = self.conn.execute(REQUEST, (key,)).fetchone() if item is None: raise KeyError(key) - return item[0] + return self.decode(item[0]) def __setitem__(self, key, value): - self.conn.execute('REPLACE INTO kv (key, value) VALUES (?,?)', (key, value)) + REQUEST = 'REPLACE INTO {} (key, value) VALUES (?,?)'.format(self.tablename) + self.conn.execute(REQUEST, (key, self.encode(value))) def __delitem__(self, key): + REQUEST = 'DELETE FROM {} WHERE key = ?'.format(self.tablename) if key not in self: raise KeyError(key) - self.conn.execute('DELETE FROM kv WHERE key = ?', (key,)) + self.conn.execute(REQUEST, (key,)) def __iter__(self): return self.iterkeys() diff --git a/proxy/run-proxy.sh b/proxy/run-proxy.sh index e832670f0..c84945e1c 100755 --- a/proxy/run-proxy.sh +++ b/proxy/run-proxy.sh @@ -16,7 +16,7 @@ if [ "$CONFIG" == "ci" ]; then [[ -z "$USE_COMBINED_START_CONTINUE" ]] && export USE_COMBINED_START_CONTINUE="YES" [[ -z "$CONTINUE_COUNT_FACTOR" ]] && export CONTINUE_COUNT_FACTOR="3" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0 - [[ -z "$JOURNAL_MODE" ]] && export JOURNAL_MODE="OFF" + [[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="postgres" elif [ "$CONFIG" == "local" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="http://localhost:8899" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=deploy @@ -27,7 +27,7 @@ elif [ "$CONFIG" == "local" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=0 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="0.9" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0 - [[ -z "$JOURNAL_MODE" ]] && export JOURNAL_MODE="OFF" + [[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost" elif [ "$CONFIG" == "devnet" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.devnet.solana.com" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU @@ -38,7 +38,7 @@ elif [ "$CONFIG" == "devnet" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="10" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=1 - [[ -z "$JOURNAL_MODE" ]] && export JOURNAL_MODE="WAL" + [[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost" elif [ "$CONFIG" == "testnet" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.testnet.solana.com" [[ -z "$EVM_LOADER" ]] && export EVM_LOADER=eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU @@ -49,7 +49,7 @@ elif [ "$CONFIG" == "testnet" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="15" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE="1" - [[ -z "$JOURNAL_MODE" ]] && export JOURNAL_MODE="WAL" + [[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost" elif [ "$CONFIG" != "custom" ]; then exit 1 fi From 0531f68dfcfce65673bf254db5b764d1c19b531d Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Fri, 5 Nov 2021 14:47:41 +0300 Subject: [PATCH 10/13] #243 sync updaes --- .buildkite/steps/deploy-test.sh | 2 +- proxy/indexer/solana_receipts_update.py | 18 ++++++++++-------- proxy/indexer/sql_dict.py | 7 ++++--- proxy/indexer/utils.py | 14 +++++++++----- proxy/plugin/solana_rest_api.py | 12 ++++++------ requirements.txt | 2 +- 6 files changed, 31 insertions(+), 24 deletions(-) diff --git a/.buildkite/steps/deploy-test.sh b/.buildkite/steps/deploy-test.sh index 9ffd4f3e1..92b5d876d 100755 --- a/.buildkite/steps/deploy-test.sh +++ b/.buildkite/steps/deploy-test.sh @@ -5,7 +5,7 @@ wait-for-proxy() { PROXY_URL="$1" - for i in {1..40}; do + for i in {1..15}; do if curl -s --header "Content-Type: application/json" --data '{"method":"eth_blockNumber","params":[],"id":93,"jsonrpc":"2.0"}' $PROXY_URL > /dev/null; then echo `date +%H:%M:%S`" proxy is available" diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index ed69768db..0004ec08d 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -6,14 +6,16 @@ import logging from solana.rpc.api import Client from multiprocessing.dummy import Pool as ThreadPool -from sqlitedict import SqliteDict +# from sqlitedict import SqliteDict from typing import Dict, Union try: from utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller + from sql_dict import SQLDict except ImportError: from .utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller + from .sql_dict import SQLDict solana_url = os.environ.get("SOLANA_URL", "https://api.devnet.solana.com") @@ -61,13 +63,13 @@ class Indexer: def __init__(self): self.client = Client(solana_url) self.canceller = Canceller() - self.logs_db = LogDB(filename="log.db") - self.blocks_by_hash = SqliteDict(filename="solana_blocks_by_hash.db", autocommit=True, journal_mode=JOURNAL_MODE) - self.transaction_receipts = SqliteDict(filename="known_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) - self.ethereum_trx = SqliteDict(filename="ethereum_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="ethereum_solana_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="solana_ethereum_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) - self.constants = SqliteDict(filename="constants.db", autocommit=True, journal_mode=JOURNAL_MODE) + self.logs_db = LogDB() + self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash") + self.transaction_receipts = SQLDict(tablename="known_transactions") + self.ethereum_trx = SQLDict(tablename="ethereum_transactions") + self.eth_sol_trx = SQLDict(tablename="ethereum_solana_transactions") + self.sol_eth_trx = SQLDict(tablename="solana_ethereum_transactions") + self.constants = SQLDict(tablename="constants") self.last_slot = 0 self.current_slot = 0 self.transaction_order = [] diff --git a/proxy/indexer/sql_dict.py b/proxy/indexer/sql_dict.py index ceef3a5e7..939bff844 100644 --- a/proxy/indexer/sql_dict.py +++ b/proxy/indexer/sql_dict.py @@ -1,4 +1,4 @@ -import psycopg +import psycopg2 import os POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db") @@ -23,11 +23,12 @@ def decode(obj): class SQLDict(dict): - def __init__(self, tablename='table', dbname=POSTGRES_DB, user=POSTGRES_USER, password=POSTGRES_PASSWORD, host=POSTGRES_HOST, encode=encode, decode=decode): + def __init__(self, tablename='table', encode=encode, decode=decode): self.encode = encode self.decode = decode self.tablename = tablename - self.conn = psycopg.connect(dbname, user, password, host) + self.conn = psycopg2.connect(dbname=POSTGRES_DB, user=POSTGRES_USER, password=POSTGRES_PASSWORD, host=POSTGRES_HOST, autocommit=True) + self.conn.isolation_level self.conn.execute(""" CREATE TABLE IF NOT EXISTS {} ( diff --git a/proxy/indexer/utils.py b/proxy/indexer/utils.py index 7300b481c..8d811cbd0 100644 --- a/proxy/indexer/utils.py +++ b/proxy/indexer/utils.py @@ -4,7 +4,7 @@ import logging import os import rlp -import sqlite3 +import psycopg2 import subprocess from construct import Struct, Bytes, Int64ul from eth_utils import big_endian_to_int @@ -173,11 +173,15 @@ def get_account_list(client, storage_account): return None +POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db") +POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy") +POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy") +POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost") + + class LogDB: - def __init__(self, filename="log.db"): - self.conn = sqlite3.connect(filename, check_same_thread=False) # multithread mode - self.conn.execute("PRAGMA journal_mode={}".format(JOURNAL_MODE)) - # self.conn.isolation_level = None # autocommit mode + def __init__(self): + self.conn = psycopg2.connect(dbname=POSTGRES_DB, user=POSTGRES_USER, password=POSTGRES_PASSWORD, host=POSTGRES_HOST) cur = self.conn.cursor() cur.execute("""CREATE TABLE IF NOT EXISTS logs ( diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index f53eddac3..f2954ee44 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -34,7 +34,7 @@ from ..core.acceptor.pool import proxy_id_glob import os from ..indexer.utils import get_trx_results, LogDB -from sqlitedict import SqliteDict +from ..indexer.sql_dict import SQLDict logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -88,11 +88,11 @@ def __init__(self): self.client = SolanaClient(solana_url) - self.logs_db = LogDB(filename="log.db") - self.blocks_by_hash = SqliteDict(filename="solana_blocks_by_hash.db", autocommit=True, journal_mode=JOURNAL_MODE) - self.ethereum_trx = SqliteDict(filename="ethereum_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="ethereum_solana_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="solana_ethereum_transactions.db", autocommit=True, journal_mode=JOURNAL_MODE, encode=json.dumps, decode=json.loads) + self.logs_db = LogDB() + self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash") + self.ethereum_trx = SQLDict(tablename="ethereum_transactions") + self.eth_sol_trx = SQLDict(tablename="ethereum_solana_transactions") + self.sol_eth_trx = SQLDict(tablename="solana_ethereum_transactions") with proxy_id_glob.get_lock(): self.proxy_id = proxy_id_glob.value diff --git a/requirements.txt b/requirements.txt index a9a0d5d79..323f726c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,6 @@ eth-keys==0.3.3 rlp web3==5.22.0 solana==0.10.0 -psycopg2 +psycopg2-binary ethereum py-solc-x==1.1.0 From 529b817bb4aa01e9d9871f85293d26cb958074dc Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Mon, 8 Nov 2021 12:18:53 +0300 Subject: [PATCH 11/13] #243 sync --- .buildkite/steps/deploy-test.sh | 2 +- proxy/indexer/solana_receipts_update.py | 3 +- proxy/indexer/sql_dict.py | 95 ++++++++++++++++--------- proxy/indexer/utils.py | 11 ++- 4 files changed, 73 insertions(+), 38 deletions(-) diff --git a/.buildkite/steps/deploy-test.sh b/.buildkite/steps/deploy-test.sh index 92b5d876d..49bef0e12 100755 --- a/.buildkite/steps/deploy-test.sh +++ b/.buildkite/steps/deploy-test.sh @@ -5,7 +5,7 @@ wait-for-proxy() { PROXY_URL="$1" - for i in {1..15}; do + for i in {1..12}; do if curl -s --header "Content-Type: application/json" --data '{"method":"eth_blockNumber","params":[],"id":93,"jsonrpc":"2.0"}' $PROXY_URL > /dev/null; then echo `date +%H:%M:%S`" proxy is available" diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index 0004ec08d..d9fd460b0 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -610,7 +610,8 @@ def get_block(self, slot): def add_hunged_storage(self, trx, storage): if abs(trx['slot'] - self.current_slot) > 16: - self.blocked_storages.add(storage) + # self.blocked_storages.add(storage) + pass def run_indexer(): diff --git a/proxy/indexer/sql_dict.py b/proxy/indexer/sql_dict.py index 939bff844..945ce0a63 100644 --- a/proxy/indexer/sql_dict.py +++ b/proxy/indexer/sql_dict.py @@ -1,5 +1,7 @@ import psycopg2 import os +import logging +from collections.abc import MutableMapping POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db") POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy") @@ -12,9 +14,13 @@ from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + def encode(obj): """Serialize an object using pickle to a binary format accepted by SQLite.""" - return psycopg.Binary(dumps(obj, protocol=PICKLE_PROTOCOL)) + return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL)) def decode(obj): @@ -22,46 +28,59 @@ def decode(obj): return loads(bytes(obj)) -class SQLDict(dict): - def __init__(self, tablename='table', encode=encode, decode=decode): +class SQLDict(MutableMapping): + """Serialize an object using pickle to a binary format accepted by SQLite.""" + + def __init__(self, tablename='table'): self.encode = encode self.decode = decode self.tablename = tablename - self.conn = psycopg2.connect(dbname=POSTGRES_DB, user=POSTGRES_USER, password=POSTGRES_PASSWORD, host=POSTGRES_HOST, autocommit=True) - self.conn.isolation_level - self.conn.execute(""" - CREATE TABLE IF NOT EXISTS - {} ( - key TEXT UNIQUE, - value BLOB - )""".format(self.tablename)) + self.conn = psycopg2.connect( + dbname=POSTGRES_DB, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST + ) + self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cur = self.conn.cursor() + cur.execute(''' + CREATE TABLE IF NOT EXISTS + {} ( + key TEXT UNIQUE, + value BYTEA + ) + '''.format(self.tablename) + ) def close(self): - self.conn.commit() self.conn.close() def __len__(self): - REQUEST = 'SELECT COUNT(*) FROM {}'.format(self.tablename) - rows = self.conn.execute(REQUEST).fetchone()[0] + cur = self.conn.cursor() + cur.execute('SELECT COUNT(*) FROM {}'.format(self.tablename)) + rows = cur.fetchone()[0] return rows if rows is not None else 0 def iterkeys(self): - REQUEST = 'SELECT key FROM {}'.format(self.tablename) - c = self.conn.cursor() - for key in c.execute(REQUEST): - yield key[0] + cur = self.conn.cursor() + cur.execute('SELECT key FROM {}'.format(self.tablename)) + rows = cur.fetchall() + for row in rows: + yield row[0] def itervalues(self): - REQUEST = 'SELECT value FROM {}'.format(self.tablename) - c = self.conn.cursor() - for value in c.execute(REQUEST): + cur = self.conn.cursor() + cur.execute('SELECT value FROM {}'.format(self.tablename)) + rows = cur.fetchall() + for value in rows: yield self.decode(value[0]) def iteritems(self): - REQUEST = 'SELECT key, value FROM {}'.format(self.tablename) - c = self.conn.cursor() - for key, value in c.execute(REQUEST): - yield key, self.decode(value) + cur = self.conn.cursor() + cur.execute('SELECT key, value FROM {}'.format(self.tablename)) + rows = cur.fetchall() + for row in rows: + yield row[0], self.decode(row[1]) def keys(self): return list(self.iterkeys()) @@ -73,25 +92,35 @@ def items(self): return list(self.iteritems()) def __contains__(self, key): - REQUEST = 'SELECT 1 FROM {} WHERE key = ?'.format(self.tablename) - return self.conn.execute(REQUEST, (key,)).fetchone() is not None + cur = self.conn.cursor() + cur.execute('SELECT 1 FROM {} WHERE key = %s'.format(self.tablename), (key,)) + return cur.fetchone() is not None def __getitem__(self, key): - REQUEST = 'SELECT value FROM {} WHERE key = ?'.format(self.tablename) - item = self.conn.execute(REQUEST, (key,)).fetchone() + cur = self.conn.cursor() + cur.execute('SELECT value FROM {} WHERE key = %s'.format(self.tablename), (key,)) + item = cur.fetchone() if item is None: raise KeyError(key) return self.decode(item[0]) def __setitem__(self, key, value): - REQUEST = 'REPLACE INTO {} (key, value) VALUES (?,?)'.format(self.tablename) - self.conn.execute(REQUEST, (key, self.encode(value))) + cur = self.conn.cursor() + cur.execute(''' + INSERT INTO {} (key, value) + VALUES (%s,%s) + ON CONFLICT (key) + DO UPDATE SET + value = EXCLUDED.value + '''.format(self.tablename), + (key, self.encode(value)) + ) def __delitem__(self, key): - REQUEST = 'DELETE FROM {} WHERE key = ?'.format(self.tablename) + cur = self.conn.cursor() if key not in self: raise KeyError(key) - self.conn.execute(REQUEST, (key,)) + cur.execute('DELETE FROM {} WHERE key = %s'.format(self.tablename), (key,)) def __iter__(self): return self.iterkeys() diff --git a/proxy/indexer/utils.py b/proxy/indexer/utils.py index 8d811cbd0..6ef84eece 100644 --- a/proxy/indexer/utils.py +++ b/proxy/indexer/utils.py @@ -181,7 +181,12 @@ def get_account_list(client, storage_account): class LogDB: def __init__(self): - self.conn = psycopg2.connect(dbname=POSTGRES_DB, user=POSTGRES_USER, password=POSTGRES_PASSWORD, host=POSTGRES_HOST) + self.conn = psycopg2.connect( + dbname=POSTGRES_DB, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST + ) cur = self.conn.cursor() cur.execute("""CREATE TABLE IF NOT EXISTS logs ( @@ -194,7 +199,7 @@ def __init__(self): transactionLogIndex INT, json TEXT, - UNIQUE(transactionLogIndex, transactionHash, topic) ON CONFLICT IGNORE + UNIQUE(transactionLogIndex, transactionHash, topic) );""") self.conn.commit() @@ -217,7 +222,7 @@ def push_logs(self, logs): if len(rows): # logger.debug(rows) cur = self.conn.cursor() - cur.executemany('INSERT INTO logs VALUES (?, ?, ?, ?, ?, ?, ?)', rows) + cur.executemany('INSERT INTO logs VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING', rows) self.conn.commit() else: logger.debug("NO LOGS") From 951aa03d3feda15a778ad24c6cb2011d238c83f8 Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Mon, 8 Nov 2021 13:37:05 +0300 Subject: [PATCH 12/13] #243 stabilize --- proxy/docker-compose-test.yml | 19 ++++++++++--------- proxy/indexer/utils.py | 14 +++++++------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/proxy/docker-compose-test.yml b/proxy/docker-compose-test.yml index f64bee961..55da74629 100644 --- a/proxy/docker-compose-test.yml +++ b/proxy/docker-compose-test.yml @@ -5,8 +5,8 @@ services: container_name: solana image: neonlabsorg/solana:${SOLANA_REVISION:-v1.7.9-resources} environment: - - SOLANA_URL=http://solana:8899 - - RUST_LOG=solana_runtime::system_instruction_processor=trace,solana_runtime::message_processor=debug,solana_bpf_loader=debug,solana_rbpf=debug + SOLANA_URL: http://solana:8899 + RUST_LOG: solana_runtime::system_instruction_processor=trace,solana_runtime::message_processor=debug,solana_bpf_loader=debug,solana_rbpf=debug hostname: solana expose: - "8899" @@ -20,10 +20,11 @@ services: postgres: container_name: postgres image: postgres:14.0 + command: postgres -c 'max_connections=1000' environment: - POSTGRES_DB: "neon-db" - POSTGRES_USER: "neon-proxy" - POSTGRES_PASSWORD: "neon-proxy-pass" + POSTGRES_DB: neon-db + POSTGRES_USER: neon-proxy + POSTGRES_PASSWORD: neon-proxy-pass hostname: postgres expose: - "5432" @@ -34,10 +35,10 @@ services: container_name: proxy image: neonlabsorg/proxy:${REVISION} environment: - POSTGRES_DB: "neon-db" - POSTGRES_USER: "neon-proxy" - POSTGRES_PASSWORD: "neon-proxy-pass" - CONFIG: "ci" + POSTGRES_DB: neon-db + POSTGRES_USER: neon-proxy + POSTGRES_PASSWORD: neon-proxy-pass + CONFIG: ci hostname: proxy ports: - 127.0.0.1:9090:9090 diff --git a/proxy/indexer/utils.py b/proxy/indexer/utils.py index 6ef84eece..ec79eda0e 100644 --- a/proxy/indexer/utils.py +++ b/proxy/indexer/utils.py @@ -222,7 +222,7 @@ def push_logs(self, logs): if len(rows): # logger.debug(rows) cur = self.conn.cursor() - cur.executemany('INSERT INTO logs VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING', rows) + cur.executemany('INSERT INTO logs VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING', rows) self.conn.commit() else: logger.debug("NO LOGS") @@ -233,21 +233,21 @@ def get_logs(self, fromBlock = None, toBlock = None, address = None, topics = No params = [] if fromBlock is not None: - queries.append("blockNumber >= ?") + queries.append("blockNumber >= %s") params.append(fromBlock) if toBlock is not None: - queries.append("blockNumber <= ?") + queries.append("blockNumber <= %s") params.append(toBlock) if blockHash is not None: blockHash = blockHash.lower() - queries.append("blockHash = ?") + queries.append("blockHash = %s") params.append(blockHash) if topics is not None: topics = [item.lower() for item in topics] - query_placeholder = ", ".join("?" * len(topics)) + query_placeholder = ", ".join(["%s" for _ in range(len(topics))]) topics_query = f"topic IN ({query_placeholder})" queries.append(topics_query) @@ -256,11 +256,11 @@ def get_logs(self, fromBlock = None, toBlock = None, address = None, topics = No if address is not None: if isinstance(address, str): address = address.lower() - queries.append("address = ?") + queries.append("address = %s") params.append(address) elif isinstance(address, list): address = [item.lower() for item in address] - query_placeholder = ", ".join("?" * len(address)) + query_placeholder = ", ".join(["%s" for _ in range(len(address))]) address_query = f"address IN ({query_placeholder})" queries.append(address_query) From 47fbf2157392402e2d8fe237aeb4e9b39280149b Mon Sep 17 00:00:00 2001 From: Dmitriy Borisenko Date: Mon, 8 Nov 2021 13:58:26 +0300 Subject: [PATCH 13/13] #243 clean code --- .buildkite/steps/deploy-test.sh | 2 +- proxy/indexer/solana_receipts_update.py | 2 -- proxy/indexer/utils.py | 11 ++++++----- proxy/plugin/solana_rest_api.py | 1 - 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/.buildkite/steps/deploy-test.sh b/.buildkite/steps/deploy-test.sh index 49bef0e12..9ffd4f3e1 100755 --- a/.buildkite/steps/deploy-test.sh +++ b/.buildkite/steps/deploy-test.sh @@ -5,7 +5,7 @@ wait-for-proxy() { PROXY_URL="$1" - for i in {1..12}; do + for i in {1..40}; do if curl -s --header "Content-Type: application/json" --data '{"method":"eth_blockNumber","params":[],"id":93,"jsonrpc":"2.0"}' $PROXY_URL > /dev/null; then echo `date +%H:%M:%S`" proxy is available" diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index fcc09c5d9..a3429a7a5 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -6,7 +6,6 @@ import logging from solana.rpc.api import Client from multiprocessing.dummy import Pool as ThreadPool -# from sqlitedict import SqliteDict from typing import Dict, Union @@ -21,7 +20,6 @@ solana_url = os.environ.get("SOLANA_URL", "https://api.devnet.solana.com") evm_loader_id = os.environ.get("EVM_LOADER", "eeLSJgWzzxrqKv1UxtRVVH8FX3qCQWUs9QuAjJpETGU") PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2")) -JOURNAL_MODE = os.environ.get("JOURNAL_MODE", "DELETE") CANCEL_TIMEOUT = int(os.environ.get("CANCEL_TIMEOUT", "60")) diff --git a/proxy/indexer/utils.py b/proxy/indexer/utils.py index ec79eda0e..ca54325aa 100644 --- a/proxy/indexer/utils.py +++ b/proxy/indexer/utils.py @@ -31,7 +31,6 @@ incinerator = "1nc1nerator11111111111111111111111111111111" system = "11111111111111111111111111111111" -JOURNAL_MODE = os.environ.get("JOURNAL_MODE", "DELETE") logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -173,20 +172,22 @@ def get_account_list(client, storage_account): return None -POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db") -POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy") -POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy") -POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost") class LogDB: def __init__(self): + POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db") + POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy") + POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy") + POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost") + self.conn = psycopg2.connect( dbname=POSTGRES_DB, user=POSTGRES_USER, password=POSTGRES_PASSWORD, host=POSTGRES_HOST ) + cur = self.conn.cursor() cur.execute("""CREATE TABLE IF NOT EXISTS logs ( diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index f2954ee44..7bfe678df 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -44,7 +44,6 @@ chainId = os.environ.get("NEON_CHAIN_ID", "0x6e") # default value 110 EXTRA_GAS = int(os.environ.get("EXTRA_GAS", "0")) -JOURNAL_MODE = os.environ.get("JOURNAL_MODE", "DELETE") class PermanentAccounts: def __init__(self, client, signer, proxy_id):