Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions proxy/docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ services:
container_name: solana
image: neonlabsorg/solana:${SOLANA_REVISION:-v1.7.9-resources}
environment:
- SOLANA_URL=http://solana:8899
- RUST_LOG=solana_runtime::system_instruction_processor=trace,solana_runtime::message_processor=debug,solana_bpf_loader=debug,solana_rbpf=debug
SOLANA_URL: http://solana:8899
RUST_LOG: solana_runtime::system_instruction_processor=trace,solana_runtime::message_processor=debug,solana_bpf_loader=debug,solana_rbpf=debug
hostname: solana
expose:
- "8899"
Expand All @@ -17,11 +17,28 @@ services:
networks:
- net

postgres:
container_name: postgres
image: postgres:14.0
command: postgres -c 'max_connections=1000'
environment:
POSTGRES_DB: neon-db
POSTGRES_USER: neon-proxy
POSTGRES_PASSWORD: neon-proxy-pass
hostname: postgres
expose:
- "5432"
networks:
- net

proxy:
container_name: proxy
image: neonlabsorg/proxy:${REVISION}
environment:
- CONFIG=ci
POSTGRES_DB: neon-db
POSTGRES_USER: neon-proxy
POSTGRES_PASSWORD: neon-proxy-pass
CONFIG: ci
hostname: proxy
ports:
- 127.0.0.1:9090:9090
Expand Down
18 changes: 9 additions & 9 deletions proxy/indexer/solana_receipts_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
import logging
from solana.rpc.api import Client
from multiprocessing.dummy import Pool as ThreadPool
from sqlitedict import SqliteDict
from typing import Dict, Union
from proxy.environment import solana_url, evm_loader_id


try:
from utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
from sql_dict import SQLDict
except ImportError:
from .utils import check_error, get_trx_results, get_trx_receipts, LogDB, Canceller
from .sql_dict import SQLDict


PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "2"))
Expand Down Expand Up @@ -60,13 +61,13 @@ class Indexer:
def __init__(self):
self.client = Client(solana_url)
self.canceller = Canceller()
self.logs_db = LogDB(filename="local.db")
self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True)
self.transaction_receipts = SqliteDict(filename="local.db", tablename="known_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
self.constants = SqliteDict(filename="local.db", tablename="constants", autocommit=True)
self.logs_db = LogDB()
self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash")
self.transaction_receipts = SQLDict(tablename="known_transactions")
self.ethereum_trx = SQLDict(tablename="ethereum_transactions")
self.eth_sol_trx = SQLDict(tablename="ethereum_solana_transactions")
self.sol_eth_trx = SQLDict(tablename="solana_ethereum_transactions")
self.constants = SQLDict(tablename="constants")
self.last_slot = 0
self.current_slot = 0
self.transaction_order = []
Expand Down Expand Up @@ -429,7 +430,6 @@ def process_receipts(self):
else:
continue_table[storage_account] = ContinueStruct(signature, None, blocked_accounts)
holder_table[holder_account] = HolderStruct(storage_account)
# self.add_hunged_storage(trx, storage_account)


elif instruction_data[0] == 0x0c or instruction_data[0] == 0x15: # Cancel
Expand Down
126 changes: 126 additions & 0 deletions proxy/indexer/sql_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import psycopg2
import os
import logging
from collections.abc import MutableMapping

POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy")
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")

try:
from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
except ImportError:
from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def encode(obj):
"""Serialize an object using pickle to a binary format accepted by SQLite."""
return psycopg2.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))


def decode(obj):
"""Deserialize objects retrieved from SQLite."""
return loads(bytes(obj))


class SQLDict(MutableMapping):
"""Serialize an object using pickle to a binary format accepted by SQLite."""

def __init__(self, tablename='table'):
self.encode = encode
self.decode = decode
self.tablename = tablename
self.conn = psycopg2.connect(
dbname=POSTGRES_DB,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST
)
self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = self.conn.cursor()
cur.execute('''
CREATE TABLE IF NOT EXISTS
{} (
key TEXT UNIQUE,
value BYTEA
)
'''.format(self.tablename)
)

def close(self):
self.conn.close()

def __len__(self):
cur = self.conn.cursor()
cur.execute('SELECT COUNT(*) FROM {}'.format(self.tablename))
rows = cur.fetchone()[0]
return rows if rows is not None else 0

def iterkeys(self):
cur = self.conn.cursor()
cur.execute('SELECT key FROM {}'.format(self.tablename))
rows = cur.fetchall()
for row in rows:
yield row[0]

def itervalues(self):
cur = self.conn.cursor()
cur.execute('SELECT value FROM {}'.format(self.tablename))
rows = cur.fetchall()
for value in rows:
yield self.decode(value[0])

def iteritems(self):
cur = self.conn.cursor()
cur.execute('SELECT key, value FROM {}'.format(self.tablename))
rows = cur.fetchall()
for row in rows:
yield row[0], self.decode(row[1])

def keys(self):
return list(self.iterkeys())

def values(self):
return list(self.itervalues())

def items(self):
return list(self.iteritems())

def __contains__(self, key):
cur = self.conn.cursor()
cur.execute('SELECT 1 FROM {} WHERE key = %s'.format(self.tablename), (key,))
return cur.fetchone() is not None

def __getitem__(self, key):
cur = self.conn.cursor()
cur.execute('SELECT value FROM {} WHERE key = %s'.format(self.tablename), (key,))
item = cur.fetchone()
if item is None:
raise KeyError(key)
return self.decode(item[0])

def __setitem__(self, key, value):
cur = self.conn.cursor()
cur.execute('''
INSERT INTO {} (key, value)
VALUES (%s,%s)
ON CONFLICT (key)
DO UPDATE SET
value = EXCLUDED.value
'''.format(self.tablename),
(key, self.encode(value))
)

def __delitem__(self, key):
cur = self.conn.cursor()
if key not in self:
raise KeyError(key)
cur.execute('DELETE FROM {} WHERE key = %s'.format(self.tablename), (key,))

def __iter__(self):
return self.iterkeys()
36 changes: 24 additions & 12 deletions proxy/indexer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import os
import rlp
import sqlite3
import psycopg2
import subprocess
from construct import Struct, Bytes, Int64ul
from eth_utils import big_endian_to_int
Expand Down Expand Up @@ -169,10 +169,22 @@ def get_account_list(client, storage_account):
return None




class LogDB:
def __init__(self, filename="local.db"):
self.conn = sqlite3.connect(filename, check_same_thread=False) # multithread mode
# self.conn.isolation_level = None # autocommit mode
def __init__(self):
POSTGRES_DB = os.environ.get("POSTGRES_DB", "neon-db")
POSTGRES_USER = os.environ.get("POSTGRES_USER", "neon-proxy")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "neon-proxy")
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")

self.conn = psycopg2.connect(
dbname=POSTGRES_DB,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST
)

cur = self.conn.cursor()
cur.execute("""CREATE TABLE IF NOT EXISTS
logs (
Expand All @@ -185,7 +197,7 @@ def __init__(self, filename="local.db"):
transactionLogIndex INT,

json TEXT,
UNIQUE(transactionLogIndex, transactionHash, topic) ON CONFLICT IGNORE
UNIQUE(transactionLogIndex, transactionHash, topic)
);""")
self.conn.commit()

Expand All @@ -208,7 +220,7 @@ def push_logs(self, logs):
if len(rows):
# logger.debug(rows)
cur = self.conn.cursor()
cur.executemany('INSERT INTO logs VALUES (?, ?, ?, ?, ?, ?, ?)', rows)
cur.executemany('INSERT INTO logs VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING', rows)
self.conn.commit()
else:
logger.debug("NO LOGS")
Expand All @@ -219,21 +231,21 @@ def get_logs(self, fromBlock = None, toBlock = None, address = None, topics = No
params = []

if fromBlock is not None:
queries.append("blockNumber >= ?")
queries.append("blockNumber >= %s")
params.append(fromBlock)

if toBlock is not None:
queries.append("blockNumber <= ?")
queries.append("blockNumber <= %s")
params.append(toBlock)

if blockHash is not None:
blockHash = blockHash.lower()
queries.append("blockHash = ?")
queries.append("blockHash = %s")
params.append(blockHash)

if topics is not None:
topics = [item.lower() for item in topics]
query_placeholder = ", ".join("?" * len(topics))
query_placeholder = ", ".join(["%s" for _ in range(len(topics))])
topics_query = f"topic IN ({query_placeholder})"

queries.append(topics_query)
Expand All @@ -242,11 +254,11 @@ def get_logs(self, fromBlock = None, toBlock = None, address = None, topics = No
if address is not None:
if isinstance(address, str):
address = address.lower()
queries.append("address = ?")
queries.append("address = %s")
params.append(address)
elif isinstance(address, list):
address = [item.lower() for item in address]
query_placeholder = ", ".join("?" * len(address))
query_placeholder = ", ".join(["%s" for _ in range(len(address))])
address_query = f"address IN ({query_placeholder})"

queries.append(address_query)
Expand Down
12 changes: 6 additions & 6 deletions proxy/plugin/solana_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from ..core.acceptor.pool import proxy_id_glob
import os
from ..indexer.utils import get_trx_results, LogDB
from sqlitedict import SqliteDict
from ..indexer.sql_dict import SQLDict
from proxy.environment import evm_loader_id, solana_cli, solana_url

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -87,11 +87,11 @@ def __init__(self):

self.client = SolanaClient(solana_url)

self.logs_db = LogDB(filename="local.db")
self.blocks_by_hash = SqliteDict(filename="local.db", tablename="solana_blocks_by_hash", autocommit=True)
self.ethereum_trx = SqliteDict(filename="local.db", tablename="ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
self.eth_sol_trx = SqliteDict(filename="local.db", tablename="ethereum_solana_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
self.sol_eth_trx = SqliteDict(filename="local.db", tablename="solana_ethereum_transactions", autocommit=True, encode=json.dumps, decode=json.loads)
self.logs_db = LogDB()
self.blocks_by_hash = SQLDict(tablename="solana_blocks_by_hash")
self.ethereum_trx = SQLDict(tablename="ethereum_transactions")
self.eth_sol_trx = SQLDict(tablename="ethereum_solana_transactions")
self.sol_eth_trx = SQLDict(tablename="solana_ethereum_transactions")

with proxy_id_glob.get_lock():
self.proxy_id = proxy_id_glob.value
Expand Down
4 changes: 4 additions & 0 deletions proxy/run-proxy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ if [ "$CONFIG" == "ci" ]; then
[[ -z "$USE_COMBINED_START_CONTINUE" ]] && export USE_COMBINED_START_CONTINUE="YES"
[[ -z "$CONTINUE_COUNT_FACTOR" ]] && export CONTINUE_COUNT_FACTOR="3"
[[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0
[[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="postgres"
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10
elif [ "$CONFIG" == "local" ]; then
[[ -z "$SOLANA_URL" ]] && export SOLANA_URL="http://localhost:8899"
Expand All @@ -21,6 +22,7 @@ elif [ "$CONFIG" == "local" ]; then
[[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=0
[[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="0.9"
[[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=0
[[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost"
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=10
elif [ "$CONFIG" == "devnet" ]; then
[[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.devnet.solana.com"
Expand All @@ -29,6 +31,7 @@ elif [ "$CONFIG" == "devnet" ]; then
[[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000
[[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="10"
[[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE=1
[[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost"
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60
elif [ "$CONFIG" == "testnet" ]; then
[[ -z "$SOLANA_URL" ]] && export SOLANA_URL="https://api.testnet.solana.com"
Expand All @@ -37,6 +40,7 @@ elif [ "$CONFIG" == "testnet" ]; then
[[ -z "$EXTRA_GAS" ]] && export EXTRA_GAS=90000
[[ -z "$NEON_CLI_TIMEOUT" ]] && export NEON_CLI_TIMEOUT="15"
[[ -z "$MINIMAL_GAS_PRICE" ]] && export MINIMAL_GAS_PRICE="1"
[[ -z "$POSTGRES_HOST" ]] && export POSTGRES_HOST="localhost"
[[ -z "$CANCEL_TIMEOUT" ]] && export CANCEL_TIMEOUT=60
elif [ "$CONFIG" != "custom" ]; then
exit 1
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ eth-keys==0.3.3
rlp
web3==5.22.0
solana==0.10.0
sqlitedict
psycopg2-binary
ethereum
py-solc-x==1.1.0