Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion scraper_service/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ requests==2.32.3
scalecodec==1.2.10
six==1.16.0
substrate-interface==1.7.10
tenacity==9.0.0
toolz==0.12.1
tqdm==4.66.4
typing_extensions==4.12.2
Expand Down
11 changes: 11 additions & 0 deletions scraper_service/shared/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class ShovelException(Exception):
"""Base exception for all shovel-related errors"""
pass

class ShovelProcessingError(ShovelException):
"""Fatal error that should crash the process"""
pass

class DatabaseConnectionError(ShovelException):
"""Retryable error for database connection issues"""
pass
136 changes: 91 additions & 45 deletions scraper_service/shared/shovel_base_class.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
from shared.clickhouse.batch_insert import buffer_insert, flush_buffer, batch_insert_into_clickhouse_table
from shared.substrate import get_substrate_client
from shared.substrate import get_substrate_client, reconnect_substrate
from time import sleep
from shared.clickhouse.utils import (
get_clickhouse_client,
table_exists,
)
from shared.exceptions import DatabaseConnectionError, ShovelProcessingError
from tqdm import tqdm
import logging
import threading
from concurrent.futures import ThreadPoolExecutor
import sys


class ShovelBaseClass:
checkpoint_block_number = 0
last_buffer_flush_call_block_number = 0
name = None
skip_interval = 1
MAX_RETRIES = 3
RETRY_DELAY = 5

def __init__(self, name, skip_interval=1):
"""
Expand All @@ -26,49 +30,80 @@ def __init__(self, name, skip_interval=1):
self.starting_block = 0 # Default value, can be overridden by subclasses

def start(self):
print("Initialising Substrate client")
substrate = get_substrate_client()

print("Fetching the finalized block")
finalized_block_hash = substrate.get_chain_finalised_head()
finalized_block_number = substrate.get_block_number(
finalized_block_hash)

# Start the clickhouse buffer
print("Starting Clickhouse buffer")
executor = ThreadPoolExecutor(max_workers=1)
threading.Thread(
target=flush_buffer,
args=(executor, self._buffer_flush_started, self._buffer_flush_done),
).start()

last_scraped_block_number = self.get_checkpoint()
logging.info(f"Last scraped block is {last_scraped_block_number}")

# Create a list of block numbers to scrape
retry_count = 0
while True:
block_numbers = tqdm(
range(last_scraped_block_number +
1, finalized_block_number + 1, self.skip_interval)
)

if len(block_numbers) > 0:
logging.info(
f"Catching up {len(block_numbers)} blocks")
for block_number in block_numbers:
self.process_block(block_number)
self.checkpoint_block_number = block_number
else:
logging.info(
"Already up to latest finalized block, checking again in 12s...")

# Make sure to sleep so buffer with checkpoint update is flushed to Clickhouse
# before trying again
sleep(12)
last_scraped_block_number = self.get_checkpoint()
finalized_block_hash = substrate.get_chain_finalised_head()
finalized_block_number = substrate.get_block_number(
finalized_block_hash)
try:
print("Initialising Substrate client")
substrate = get_substrate_client()

print("Fetching the finalized block")
finalized_block_hash = substrate.get_chain_finalised_head()
finalized_block_number = substrate.get_block_number(finalized_block_hash)

# Start the clickhouse buffer
print("Starting Clickhouse buffer")
executor = ThreadPoolExecutor(max_workers=1)
buffer_thread = threading.Thread(
target=flush_buffer,
args=(executor, self._buffer_flush_started, self._buffer_flush_done),
daemon=True # Make it a daemon thread so it exits with the main thread
)
buffer_thread.start()

last_scraped_block_number = self.get_checkpoint()
logging.info(f"Last scraped block is {last_scraped_block_number}")

# Create a list of block numbers to scrape
while True:
try:
block_numbers = list(range(
last_scraped_block_number + 1,
finalized_block_number + 1,
self.skip_interval
))

if len(block_numbers) > 0:
logging.info(f"Catching up {len(block_numbers)} blocks")
for block_number in tqdm(block_numbers):
try:
self.process_block(block_number)
self.checkpoint_block_number = block_number
except DatabaseConnectionError as e:
logging.error(f"Database connection error while processing block {block_number}: {str(e)}")
raise # Re-raise to be caught by outer try-except
except Exception as e:
logging.error(f"Fatal error while processing block {block_number}: {str(e)}")
raise ShovelProcessingError(f"Failed to process block {block_number}: {str(e)}")
else:
logging.info("Already up to latest finalized block, checking again in 12s...")

# Reset retry count on successful iteration
retry_count = 0

# Make sure to sleep so buffer with checkpoint update is flushed to Clickhouse
sleep(12)
last_scraped_block_number = self.get_checkpoint()
finalized_block_hash = substrate.get_chain_finalised_head()
finalized_block_number = substrate.get_block_number(finalized_block_hash)

except DatabaseConnectionError as e:
retry_count += 1
if retry_count > self.MAX_RETRIES:
logging.error(f"Max retries ({self.MAX_RETRIES}) exceeded for database connection. Exiting.")
raise ShovelProcessingError("Max database connection retries exceeded")

logging.warning(f"Database connection error (attempt {retry_count}/{self.MAX_RETRIES}): {str(e)}")
logging.info(f"Retrying in {self.RETRY_DELAY} seconds...")
sleep(self.RETRY_DELAY)
reconnect_substrate() # Try to reconnect to substrate
continue

except ShovelProcessingError as e:
logging.error(f"Fatal shovel error: {str(e)}")
sys.exit(1)
except Exception as e:
logging.error(f"Unexpected error: {str(e)}")
sys.exit(1)

def process_block(self, n):
raise NotImplementedError(
Expand Down Expand Up @@ -106,7 +141,18 @@ def _buffer_flush_done(self, tables, rows):

def get_checkpoint(self):
if not table_exists("shovel_checkpoints"):
return self.starting_block - 1
return max(0, self.starting_block - 1)

# First check if our shovel has any entries
query = f"""
SELECT count(*)
FROM shovel_checkpoints
WHERE shovel_name = '{self.name}'
"""
count = get_clickhouse_client().execute(query)[0][0]
if count == 0:
return max(0, self.starting_block - 1)

query = f"""
SELECT block_number
FROM shovel_checkpoints
Expand All @@ -118,4 +164,4 @@ def get_checkpoint(self):
if res:
return res[0][0]
else:
return self.starting_block - 1
return max(0, self.starting_block - 1) # This case shouldn't happen due to count check above
70 changes: 45 additions & 25 deletions scraper_service/shovel_block_timestamp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
get_clickhouse_client,
table_exists,
)
from shared.exceptions import DatabaseConnectionError, ShovelProcessingError
import logging


Expand All @@ -20,31 +21,50 @@ def process_block(self, n):


def do_process_block(self, n):
substrate = get_substrate_client()

# Create table if it doesn't exist
if not table_exists(self.table_name):
query = f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
block_number UInt64 CODEC(Delta, ZSTD),
timestamp DateTime CODEC(Delta, ZSTD),
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY block_number
"""
get_clickhouse_client().execute(query)

block_hash = substrate.get_block_hash(n)
block_timestamp = int(
substrate.query(
"Timestamp",
"Now",
block_hash=block_hash,
).serialize()
/ 1000
)

buffer_insert(self.table_name, [n, block_timestamp])
try:
substrate = get_substrate_client()

try:
if not table_exists(self.table_name):
query = f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
block_number UInt64 CODEC(Delta, ZSTD),
timestamp DateTime CODEC(Delta, ZSTD),
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY block_number
"""
get_clickhouse_client().execute(query)
except Exception as e:
raise DatabaseConnectionError(f"Failed to create/check table: {str(e)}")

try:
block_hash = substrate.get_block_hash(n)
block_timestamp = int(
substrate.query(
"Timestamp",
"Now",
block_hash=block_hash,
).serialize()
/ 1000
)
except Exception as e:
raise ShovelProcessingError(f"Failed to get block timestamp from substrate: {str(e)}")

if block_timestamp == 0 and n != 0:
raise ShovelProcessingError(f"Invalid block timestamp (0) for block {n}")

try:
buffer_insert(self.table_name, [n, block_timestamp])
except Exception as e:
raise DatabaseConnectionError(f"Failed to insert data into buffer: {str(e)}")

except (DatabaseConnectionError, ShovelProcessingError):
# Re-raise these exceptions to be handled by the base class
raise
except Exception as e:
# Convert unexpected exceptions to ShovelProcessingError
raise ShovelProcessingError(f"Unexpected error processing block {n}: {str(e)}")


def main():
Expand Down
Loading
Loading