diff --git a/proxy/docker-compose-test.yml b/proxy/docker-compose-test.yml index 294a16dda..55da74629 100644 --- a/proxy/docker-compose-test.yml +++ b/proxy/docker-compose-test.yml @@ -5,8 +5,8 @@ services: container_name: solana image: neonlabsorg/solana:${SOLANA_REVISION:-v1.7.9-resources} environment: - - SOLANA_URL=http://solana:8899 - - RUST_LOG=solana_runtime::system_instruction_processor=trace,solana_runtime::message_processor=debug,solana_bpf_loader=debug,solana_rbpf=debug + SOLANA_URL: http://solana:8899 + RUST_LOG: solana_runtime::system_instruction_processor=trace,solana_runtime::message_processor=debug,solana_bpf_loader=debug,solana_rbpf=debug hostname: solana expose: - "8899" @@ -17,11 +17,28 @@ services: networks: - net + postgres: + container_name: postgres + image: postgres:14.0 + command: postgres -c 'max_connections=1000' + environment: + POSTGRES_DB: neon-db + POSTGRES_USER: neon-proxy + POSTGRES_PASSWORD: neon-proxy-pass + hostname: postgres + expose: + - "5432" + networks: + - net + proxy: container_name: proxy image: neonlabsorg/proxy:${REVISION} environment: - - CONFIG=ci + POSTGRES_DB: neon-db + POSTGRES_USER: neon-proxy + POSTGRES_PASSWORD: neon-proxy-pass + CONFIG: ci hostname: proxy ports: - 127.0.0.1:9090:9090 diff --git a/proxy/indexer/solana_receipts_update.py b/proxy/indexer/solana_receipts_update.py index c7b85ec17..2bc67b1e2 100644 --- a/proxy/indexer/solana_receipts_update.py +++ b/proxy/indexer/solana_receipts_update.py @@ -6,15 +6,16 @@ import logging from solana.rpc.api import Client from multiprocessing.dummy import Pool as ThreadPool -from sqlitedict import SqliteDict from typing import Dict, Union from proxy.environment import solana_url, evm_loader_id try: from utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller + from sql_dict import SQLDict except ImportError: from .utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller + from .sql_dict import SQLDict PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2")) @@ -60,13 +61,13 @@ class Indexer: def __init__(self): self.client = Client(solana_url) self.canceller = Canceller() - self.logs_db = LogDB(filename="local.db") - self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True) - self.transaction_receipts = SqliteDict(filename="local.db", tablename="known_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.constants = SqliteDict(filename="local.db", tablename="constants", autocommit=True) + self.logs_db = LogDB() + self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash") + self.transaction_receipts = SQLDict(tablename="known_transactions") + self.ethereum_trx = SQLDict(tablename="ethereum_transactions") + self.eth_sol_trx = SQLDict(tablename="ethereum_solana_transactions") + self.sol_eth_trx = SQLDict(tablename="solana_ethereum_transactions") + self.constants = SQLDict(tablename="constants") self.last_slot = 0 self.current_slot = 0 self.transaction_order = [] @@ -429,7 +430,6 @@ def process_receipts(self): else: continue_table[storage_account] = ContinueStruct(signature, None, blocked_accounts) holder_table[holder_account] = HolderStruct(storage_account) - # self.add_hunged_storage(trx, storage_account) elif instruction_data[0] == 0x0c or instruction_data[0] == 0x15: # Cancel diff --git a/proxy/indexer/sql_dict.py b/proxy/indexer/sql_dict.py new file mode 100644 index 000000000..945ce0a63 --- /dev/null +++ b/proxy/indexer/sql_dict.py @@ -0,0 +1,126 @@ +import psycopg2 +import os +import logging +from collections.abc import MutableMapping + +POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db") +POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy") +POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy") +POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost") + +try: + from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL +except ImportError: + from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +def encode(obj): + """Serialize an object using pickle to a binary format accepted by SQLite.""" + return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL)) + + +def decode(obj): + """Deserialize objects retrieved from SQLite.""" + return loads(bytes(obj)) + + +class SQLDict(MutableMapping): + """Serialize an object using pickle to a binary format accepted by SQLite.""" + + def __init__(self, tablename='table'): + self.encode = encode + self.decode = decode + self.tablename = tablename + self.conn = psycopg2.connect( + dbname=POSTGRES_DB, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST + ) + self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cur = self.conn.cursor() + cur.execute(''' + CREATE TABLE IF NOT EXISTS + {} ( + key TEXT UNIQUE, + value BYTEA + ) + '''.format(self.tablename) + ) + + def close(self): + self.conn.close() + + def __len__(self): + cur = self.conn.cursor() + cur.execute('SELECT COUNT(*) FROM {}'.format(self.tablename)) + rows = cur.fetchone()[0] + return rows if rows is not None else 0 + + def iterkeys(self): + cur = self.conn.cursor() + cur.execute('SELECT key FROM {}'.format(self.tablename)) + rows = cur.fetchall() + for row in rows: + yield row[0] + + def itervalues(self): + cur = self.conn.cursor() + cur.execute('SELECT value FROM {}'.format(self.tablename)) + rows = cur.fetchall() + for value in rows: + yield self.decode(value[0]) + + def iteritems(self): + cur = self.conn.cursor() + cur.execute('SELECT key, value FROM {}'.format(self.tablename)) + rows = cur.fetchall() + for row in rows: + yield row[0], self.decode(row[1]) + + def keys(self): + return list(self.iterkeys()) + + def values(self): + return list(self.itervalues()) + + def items(self): + return list(self.iteritems()) + + def __contains__(self, key): + cur = self.conn.cursor() + cur.execute('SELECT 1 FROM {} WHERE key = %s'.format(self.tablename), (key,)) + return cur.fetchone() is not None + + def __getitem__(self, key): + cur = self.conn.cursor() + cur.execute('SELECT value FROM {} WHERE key = %s'.format(self.tablename), (key,)) + item = cur.fetchone() + if item is None: + raise KeyError(key) + return self.decode(item[0]) + + def __setitem__(self, key, value): + cur = self.conn.cursor() + cur.execute(''' + INSERT INTO {} (key, value) + VALUES (%s,%s) + ON CONFLICT (key) + DO UPDATE SET + value = EXCLUDED.value + '''.format(self.tablename), + (key, self.encode(value)) + ) + + def __delitem__(self, key): + cur = self.conn.cursor() + if key not in self: + raise KeyError(key) + cur.execute('DELETE FROM {} WHERE key = %s'.format(self.tablename), (key,)) + + def __iter__(self): + return self.iterkeys() diff --git a/proxy/indexer/utils.py b/proxy/indexer/utils.py index e008e0632..7b45f725e 100644 --- a/proxy/indexer/utils.py +++ b/proxy/indexer/utils.py @@ -4,7 +4,7 @@ import logging import os import rlp -import sqlite3 +import psycopg2 import subprocess from construct import Struct, Bytes, Int64ul from eth_utils import big_endian_to_int @@ -169,10 +169,22 @@ def get_account_list(client, storage_account): return None + + class LogDB: - def __init__(self, filename="local.db"): - self.conn = sqlite3.connect(filename, check_same_thread=False) # multithread mode - # self.conn.isolation_level = None # autocommit mode + def __init__(self): + POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db") + POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy") + POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy") + POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost") + + self.conn = psycopg2.connect( + dbname=POSTGRES_DB, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST + ) + cur = self.conn.cursor() cur.execute("""CREATE TABLE IF NOT EXISTS logs ( @@ -185,7 +197,7 @@ def __init__(self, filename="local.db"): transactionLogIndex INT, json TEXT, - UNIQUE(transactionLogIndex, transactionHash, topic) ON CONFLICT IGNORE + UNIQUE(transactionLogIndex, transactionHash, topic) );""") self.conn.commit() @@ -208,7 +220,7 @@ def push_logs(self, logs): if len(rows): # logger.debug(rows) cur = self.conn.cursor() - cur.executemany('INSERT INTO logs VALUES (?, ?, ?, ?, ?, ?, ?)', rows) + cur.executemany('INSERT INTO logs VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING', rows) self.conn.commit() else: logger.debug("NO LOGS") @@ -219,21 +231,21 @@ def get_logs(self, fromBlock = None, toBlock = None, address = None, topics = No params = [] if fromBlock is not None: - queries.append("blockNumber >= ?") + queries.append("blockNumber >= %s") params.append(fromBlock) if toBlock is not None: - queries.append("blockNumber <= ?") + queries.append("blockNumber <= %s") params.append(toBlock) if blockHash is not None: blockHash = blockHash.lower() - queries.append("blockHash = ?") + queries.append("blockHash = %s") params.append(blockHash) if topics is not None: topics = [item.lower() for item in topics] - query_placeholder = ", ".join("?" * len(topics)) + query_placeholder = ", ".join(["%s" for _ in range(len(topics))]) topics_query = f"topic IN ({query_placeholder})" queries.append(topics_query) @@ -242,11 +254,11 @@ def get_logs(self, fromBlock = None, toBlock = None, address = None, topics = No if address is not None: if isinstance(address, str): address = address.lower() - queries.append("address = ?") + queries.append("address = %s") params.append(address) elif isinstance(address, list): address = [item.lower() for item in address] - query_placeholder = ", ".join("?" * len(address)) + query_placeholder = ", ".join(["%s" for _ in range(len(address))]) address_query = f"address IN ({query_placeholder})" queries.append(address_query) diff --git a/proxy/plugin/solana_rest_api.py b/proxy/plugin/solana_rest_api.py index 7d1abb36e..15b4c5c41 100644 --- a/proxy/plugin/solana_rest_api.py +++ b/proxy/plugin/solana_rest_api.py @@ -34,7 +34,7 @@ from ..core.acceptor.pool import proxy_id_glob import os from ..indexer.utils import get_trx_results, LogDB -from sqlitedict import SqliteDict +from ..indexer.sql_dict import SQLDict from proxy.environment import evm_loader_id, solana_cli, solana_url logger = logging.getLogger(__name__) @@ -87,11 +87,11 @@ def __init__(self): self.client = SolanaClient(solana_url) - self.logs_db = LogDB(filename="local.db") - self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True) - self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, encode=json.dumps, decode=json.loads) - self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads) + self.logs_db = LogDB() + self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash") + self.ethereum_trx = SQLDict(tablename="ethereum_transactions") + self.eth_sol_trx = SQLDict(tablename="ethereum_solana_transactions") + self.sol_eth_trx = SQLDict(tablename="solana_ethereum_transactions") with proxy_id_glob.get_lock(): self.proxy_id = proxy_id_glob.value diff --git a/proxy/run-proxy.sh b/proxy/run-proxy.sh index 21696fe39..ac01288a5 100755 --- a/proxy/run-proxy.sh +++ b/proxy/run-proxy.sh @@ -13,6 +13,7 @@ if [ "$CONFIG" == "ci" ]; then [[ -z "$USE_COMBINED_START_CONTINUE" ]] && export USE_COMBINED_START_CONTINUE="YES" [[ -z "$CONTINUE_COUNT_FACTOR" ]] && export CONTINUE_COUNT_FACTOR="3" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0 + [[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="postgres" [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10 elif [ "$CONFIG" == "local" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="http://localhost:8899" @@ -21,6 +22,7 @@ elif [ "$CONFIG" == "local" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=0 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="0.9" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0 + [[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost" [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10 elif [ "$CONFIG" == "devnet" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.devnet.solana.com" @@ -29,6 +31,7 @@ elif [ "$CONFIG" == "devnet" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="10" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=1 + [[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost" [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60 elif [ "$CONFIG" == "testnet" ]; then [[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.testnet.solana.com" @@ -37,6 +40,7 @@ elif [ "$CONFIG" == "testnet" ]; then [[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000 [[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="15" [[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE="1" + [[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost" [[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60 elif [ "$CONFIG" != "custom" ]; then exit 1 diff --git a/requirements.txt b/requirements.txt index 1671a2cb1..323f726c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,6 @@ eth-keys==0.3.3 rlp web3==5.22.0 solana==0.10.0 -sqlitedict +psycopg2-binary ethereum py-solc-x==1.1.0