Skip to content

Commit cf6b250

Browse files
otselniks-medvedev
andauthored
#243 Database lock fixes (#261)
* #243 Use `write-ahead log` journal_mode in connection * 'off' journal mode * #243 Use differsnt files for tables * #243 fixes * #243 use journal mode from environment * sync * #265 fixes * #265 config defined timeout * #243 update sqldict * #243 sync updaes * #243 sync * #243 stabilize * #243 clean code Co-authored-by: s-medvedev <40623263+s-medvedev@users.noreply.github.com>
1 parent f635c1e commit cf6b250

File tree

7 files changed

+190
-31
lines changed

7 files changed

+190
-31
lines changed

proxy/docker-compose-test.yml

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ services:
55
container_name: solana
66
image: neonlabsorg/solana:${SOLANA_REVISION:-v1.7.9-resources}
77
environment:
8-
- SOLANA_URL=http://solana:8899
9-
- RUST_LOG=solana_runtime::system_instruction_processor=trace,solana_runtime::message_processor=debug,solana_bpf_loader=debug,solana_rbpf=debug
8+
SOLANA_URL: http://solana:8899
9+
RUST_LOG: solana_runtime::system_instruction_processor=trace,solana_runtime::message_processor=debug,solana_bpf_loader=debug,solana_rbpf=debug
1010
hostname: solana
1111
expose:
1212
- "8899"
@@ -17,11 +17,28 @@ services:
1717
networks:
1818
- net
1919

20+
postgres:
21+
container_name: postgres
22+
image: postgres:14.0
23+
command: postgres -c 'max_connections=1000'
24+
environment:
25+
POSTGRES_DB: neon-db
26+
POSTGRES_USER: neon-proxy
27+
POSTGRES_PASSWORD: neon-proxy-pass
28+
hostname: postgres
29+
expose:
30+
- "5432"
31+
networks:
32+
- net
33+
2034
proxy:
2135
container_name: proxy
2236
image: neonlabsorg/proxy:${REVISION}
2337
environment:
24-
- CONFIG=ci
38+
POSTGRES_DB: neon-db
39+
POSTGRES_USER: neon-proxy
40+
POSTGRES_PASSWORD: neon-proxy-pass
41+
CONFIG: ci
2542
hostname: proxy
2643
ports:
2744
- 127.0.0.1:9090:9090

proxy/indexer/solana_receipts_update.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@
66
import logging
77
from solana.rpc.api import Client
88
from multiprocessing.dummy import Pool as ThreadPool
9-
from sqlitedict import SqliteDict
109
from typing import Dict, Union
1110
from proxy.environment import solana_url, evm_loader_id
1211

1312

1413
try:
1514
from utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
15+
from sql_dict import SQLDict
1616
except ImportError:
1717
from .utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
18+
from .sql_dict import SQLDict
1819

1920

2021
PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
@@ -60,13 +61,13 @@ class Indexer:
6061
def __init__(self):
6162
self.client = Client(solana_url)
6263
self.canceller = Canceller()
63-
self.logs_db = LogDB(filename="local.db")
64-
self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True)
65-
self.transaction_receipts = SqliteDict(filename="local.db", tablename="known_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
66-
self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
67-
self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
68-
self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
69-
self.constants = SqliteDict(filename="local.db", tablename="constants", autocommit=True)
64+
self.logs_db = LogDB()
65+
self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash")
66+
self.transaction_receipts = SQLDict(tablename="known_transactions")
67+
self.ethereum_trx = SQLDict(tablename="ethereum_transactions")
68+
self.eth_sol_trx = SQLDict(tablename="ethereum_solana_transactions")
69+
self.sol_eth_trx = SQLDict(tablename="solana_ethereum_transactions")
70+
self.constants = SQLDict(tablename="constants")
7071
self.last_slot = 0
7172
self.current_slot = 0
7273
self.transaction_order = []
@@ -429,7 +430,6 @@ def process_receipts(self):
429430
else:
430431
continue_table[storage_account] = ContinueStruct(signature, None, blocked_accounts)
431432
holder_table[holder_account] = HolderStruct(storage_account)
432-
# self.add_hunged_storage(trx, storage_account)
433433

434434

435435
elif instruction_data[0] == 0x0c or instruction_data[0] == 0x15: # Cancel

proxy/indexer/sql_dict.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import psycopg2
2+
import os
3+
import logging
4+
from collections.abc import MutableMapping
5+
6+
POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
7+
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
8+
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy")
9+
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
10+
11+
try:
12+
from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
13+
except ImportError:
14+
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
15+
16+
17+
logger = logging.getLogger(__name__)
18+
logger.setLevel(logging.DEBUG)
19+
20+
21+
def encode(obj):
22+
"""Serialize an object using pickle to a binary format accepted by SQLite."""
23+
return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))
24+
25+
26+
def decode(obj):
27+
"""Deserialize objects retrieved from SQLite."""
28+
return loads(bytes(obj))
29+
30+
31+
class SQLDict(MutableMapping):
32+
"""Serialize an object using pickle to a binary format accepted by SQLite."""
33+
34+
def __init__(self, tablename='table'):
35+
self.encode = encode
36+
self.decode = decode
37+
self.tablename = tablename
38+
self.conn = psycopg2.connect(
39+
dbname=POSTGRES_DB,
40+
user=POSTGRES_USER,
41+
password=POSTGRES_PASSWORD,
42+
host=POSTGRES_HOST
43+
)
44+
self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
45+
cur = self.conn.cursor()
46+
cur.execute('''
47+
CREATE TABLE IF NOT EXISTS
48+
{} (
49+
key TEXT UNIQUE,
50+
value BYTEA
51+
)
52+
'''.format(self.tablename)
53+
)
54+
55+
def close(self):
56+
self.conn.close()
57+
58+
def __len__(self):
59+
cur = self.conn.cursor()
60+
cur.execute('SELECT COUNT(*) FROM {}'.format(self.tablename))
61+
rows = cur.fetchone()[0]
62+
return rows if rows is not None else 0
63+
64+
def iterkeys(self):
65+
cur = self.conn.cursor()
66+
cur.execute('SELECT key FROM {}'.format(self.tablename))
67+
rows = cur.fetchall()
68+
for row in rows:
69+
yield row[0]
70+
71+
def itervalues(self):
72+
cur = self.conn.cursor()
73+
cur.execute('SELECT value FROM {}'.format(self.tablename))
74+
rows = cur.fetchall()
75+
for value in rows:
76+
yield self.decode(value[0])
77+
78+
def iteritems(self):
79+
cur = self.conn.cursor()
80+
cur.execute('SELECT key, value FROM {}'.format(self.tablename))
81+
rows = cur.fetchall()
82+
for row in rows:
83+
yield row[0], self.decode(row[1])
84+
85+
def keys(self):
86+
return list(self.iterkeys())
87+
88+
def values(self):
89+
return list(self.itervalues())
90+
91+
def items(self):
92+
return list(self.iteritems())
93+
94+
def __contains__(self, key):
95+
cur = self.conn.cursor()
96+
cur.execute('SELECT 1 FROM {} WHERE key = %s'.format(self.tablename), (key,))
97+
return cur.fetchone() is not None
98+
99+
def __getitem__(self, key):
100+
cur = self.conn.cursor()
101+
cur.execute('SELECT value FROM {} WHERE key = %s'.format(self.tablename), (key,))
102+
item = cur.fetchone()
103+
if item is None:
104+
raise KeyError(key)
105+
return self.decode(item[0])
106+
107+
def __setitem__(self, key, value):
108+
cur = self.conn.cursor()
109+
cur.execute('''
110+
INSERT INTO {} (key, value)
111+
VALUES (%s,%s)
112+
ON CONFLICT (key)
113+
DO UPDATE SET
114+
value = EXCLUDED.value
115+
'''.format(self.tablename),
116+
(key, self.encode(value))
117+
)
118+
119+
def __delitem__(self, key):
120+
cur = self.conn.cursor()
121+
if key not in self:
122+
raise KeyError(key)
123+
cur.execute('DELETE FROM {} WHERE key = %s'.format(self.tablename), (key,))
124+
125+
def __iter__(self):
126+
return self.iterkeys()

proxy/indexer/utils.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import logging
55
import os
66
import rlp
7-
import sqlite3
7+
import psycopg2
88
import subprocess
99
from construct import Struct, Bytes, Int64ul
1010
from eth_utils import big_endian_to_int
@@ -169,10 +169,22 @@ def get_account_list(client, storage_account):
169169
return None
170170

171171

172+
173+
172174
class LogDB:
173-
def __init__(self, filename="local.db"):
174-
self.conn = sqlite3.connect(filename, check_same_thread=False) # multithread mode
175-
# self.conn.isolation_level = None # autocommit mode
175+
def __init__(self):
176+
POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
177+
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
178+
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy")
179+
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
180+
181+
self.conn = psycopg2.connect(
182+
dbname=POSTGRES_DB,
183+
user=POSTGRES_USER,
184+
password=POSTGRES_PASSWORD,
185+
host=POSTGRES_HOST
186+
)
187+
176188
cur = self.conn.cursor()
177189
cur.execute("""CREATE TABLE IF NOT EXISTS
178190
logs (
@@ -185,7 +197,7 @@ def __init__(self, filename="local.db"):
185197
transactionLogIndex INT,
186198
187199
json TEXT,
188-
UNIQUE(transactionLogIndex, transactionHash, topic) ON CONFLICT IGNORE
200+
UNIQUE(transactionLogIndex, transactionHash, topic)
189201
);""")
190202
self.conn.commit()
191203

@@ -208,7 +220,7 @@ def push_logs(self, logs):
208220
if len(rows):
209221
# logger.debug(rows)
210222
cur = self.conn.cursor()
211-
cur.executemany('INSERT INTO logs VALUES (?, ?, ?, ?, ?, ?, ?)', rows)
223+
cur.executemany('INSERT INTO logs VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING', rows)
212224
self.conn.commit()
213225
else:
214226
logger.debug("NO LOGS")
@@ -219,21 +231,21 @@ def get_logs(self, fromBlock = None, toBlock = None, address = None, topics = No
219231
params = []
220232

221233
if fromBlock is not None:
222-
queries.append("blockNumber >= ?")
234+
queries.append("blockNumber >= %s")
223235
params.append(fromBlock)
224236

225237
if toBlock is not None:
226-
queries.append("blockNumber <= ?")
238+
queries.append("blockNumber <= %s")
227239
params.append(toBlock)
228240

229241
if blockHash is not None:
230242
blockHash = blockHash.lower()
231-
queries.append("blockHash = ?")
243+
queries.append("blockHash = %s")
232244
params.append(blockHash)
233245

234246
if topics is not None:
235247
topics = [item.lower() for item in topics]
236-
query_placeholder = ", ".join("?" * len(topics))
248+
query_placeholder = ", ".join(["%s" for _ in range(len(topics))])
237249
topics_query = f"topic IN ({query_placeholder})"
238250

239251
queries.append(topics_query)
@@ -242,11 +254,11 @@ def get_logs(self, fromBlock = None, toBlock = None, address = None, topics = No
242254
if address is not None:
243255
if isinstance(address, str):
244256
address = address.lower()
245-
queries.append("address = ?")
257+
queries.append("address = %s")
246258
params.append(address)
247259
elif isinstance(address, list):
248260
address = [item.lower() for item in address]
249-
query_placeholder = ", ".join("?" * len(address))
261+
query_placeholder = ", ".join(["%s" for _ in range(len(address))])
250262
address_query = f"address IN ({query_placeholder})"
251263

252264
queries.append(address_query)

proxy/plugin/solana_rest_api.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from ..core.acceptor.pool import proxy_id_glob
3535
import os
3636
from ..indexer.utils import get_trx_results, LogDB
37-
from sqlitedict import SqliteDict
37+
from ..indexer.sql_dict import SQLDict
3838
from proxy.environment import evm_loader_id, solana_cli, solana_url
3939

4040
logger = logging.getLogger(__name__)
@@ -87,11 +87,11 @@ def __init__(self):
8787

8888
self.client = SolanaClient(solana_url)
8989

90-
self.logs_db = LogDB(filename="local.db")
91-
self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True)
92-
self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
93-
self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
94-
self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
90+
self.logs_db = LogDB()
91+
self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash")
92+
self.ethereum_trx = SQLDict(tablename="ethereum_transactions")
93+
self.eth_sol_trx = SQLDict(tablename="ethereum_solana_transactions")
94+
self.sol_eth_trx = SQLDict(tablename="solana_ethereum_transactions")
9595

9696
with proxy_id_glob.get_lock():
9797
self.proxy_id = proxy_id_glob.value

proxy/run-proxy.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ if [ "$CONFIG" == "ci" ]; then
1313
[[ -z "$USE_COMBINED_START_CONTINUE" ]] && export USE_COMBINED_START_CONTINUE="YES"
1414
[[ -z "$CONTINUE_COUNT_FACTOR" ]] && export CONTINUE_COUNT_FACTOR="3"
1515
[[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0
16+
[[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="postgres"
1617
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10
1718
elif [ "$CONFIG" == "local" ]; then
1819
[[ -z "$SOLANA_URL" ]] && export SOLANA_URL="http://localhost:8899"
@@ -21,6 +22,7 @@ elif [ "$CONFIG" == "local" ]; then
2122
[[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=0
2223
[[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="0.9"
2324
[[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0
25+
[[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost"
2426
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10
2527
elif [ "$CONFIG" == "devnet" ]; then
2628
[[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.devnet.solana.com"
@@ -29,6 +31,7 @@ elif [ "$CONFIG" == "devnet" ]; then
2931
[[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000
3032
[[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="10"
3133
[[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=1
34+
[[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost"
3235
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60
3336
elif [ "$CONFIG" == "testnet" ]; then
3437
[[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.testnet.solana.com"
@@ -37,6 +40,7 @@ elif [ "$CONFIG" == "testnet" ]; then
3740
[[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000
3841
[[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="15"
3942
[[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE="1"
43+
[[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost"
4044
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60
4145
elif [ "$CONFIG" != "custom" ]; then
4246
exit 1

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ eth-keys==0.3.3
55
rlp
66
web3==5.22.0
77
solana==0.10.0
8-
sqlitedict
8+
psycopg2-binary
99
ethereum
1010
py-solc-x==1.1.0

0 commit comments

Comments
 (0)