Skip to content

Commit

Permalink
Transfers: rework heartbeat handling. Closes rucio#5252
Browse files Browse the repository at this point in the history
Right now heartbeats are updated in the database at each iteration
(each activity for multi-activity daemons). This forces us to use
a big "older_than" value to avoid the race condition when a big bulk
size and slow access to transfertool makes hearbeats expire.
At the same time, this puts strain on database with un-needed heartbeat
updates when the daemons are lightly loaded.

Rework this behavior to perform hearbeat_handler.live() more frequently
(each submission, for example), but modify live() to only perform a
database update if enough time has passed from last update.
  • Loading branch information
rcarpa committed Mar 1, 2022
1 parent 65fb816 commit 6ead54d
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 29 deletions.
39 changes: 22 additions & 17 deletions lib/rucio/daemons/conveyor/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ class HeartbeatHandler:
Simple contextmanager which sets a heartbeat and associated logger on entry and cleans up the heartbeat on exit.
"""

def __init__(self, executable, logger_prefix=None):
def __init__(self, executable, renewal_interval, logger_prefix=None):
self.executable = executable
self.renewal_interval = renewal_interval
self.older_than = renewal_interval * 10 if renewal_interval and renewal_interval > 0 else None # 10 was chosen without any particular reason
self.logger_prefix = logger_prefix or executable

self.hostname = socket.getfqdn()
Expand All @@ -78,6 +80,7 @@ def __init__(self, executable, logger_prefix=None):

self.logger = None
self.last_heart_beat = None
self.last_time = None

def __enter__(self):
heartbeat.sanity_check(executable=self.executable, hostname=self.hostname)
Expand All @@ -90,26 +93,28 @@ def __exit__(self, exc_type, exc_val, exc_tb):
if self.logger:
self.logger(logging.INFO, 'Heartbeat cleaned up')

def live(self, older_than=None):
if older_than:
self.last_heart_beat = heartbeat.live(self.executable, self.hostname, self.pid, self.hb_thread, older_than=older_than)
else:
self.last_heart_beat = heartbeat.live(self.executable, self.hostname, self.pid, self.hb_thread)
def live(self):
if not self.last_time or self.last_time < datetime.datetime.now() - datetime.timedelta(seconds=self.renewal_interval):
if self.older_than:
self.last_heart_beat = heartbeat.live(self.executable, self.hostname, self.pid, self.hb_thread, older_than=self.older_than)
else:
self.last_heart_beat = heartbeat.live(self.executable, self.hostname, self.pid, self.hb_thread)

prefix = '%s[%i/%i]: ' % (self.logger_prefix, self.last_heart_beat['assign_thread'], self.last_heart_beat['nr_threads'])
self.logger = formatted_logger(logging.log, prefix + '%s')
prefix = '%s[%i/%i]: ' % (self.logger_prefix, self.last_heart_beat['assign_thread'], self.last_heart_beat['nr_threads'])
self.logger = formatted_logger(logging.log, prefix + '%s')

if not self.last_heart_beat:
self.logger(logging.DEBUG, 'First heartbeat set')
else:
self.logger(logging.DEBUG, 'Heartbeat renewed')
if not self.last_time:
self.logger(logging.DEBUG, 'First heartbeat set')
else:
self.logger(logging.DEBUG, 'Heartbeat renewed')
self.last_time = datetime.datetime.now()

return self.last_heart_beat, self.logger
return self.last_heart_beat['assign_thread'], self.last_heart_beat['nr_threads'], self.logger


def run_conveyor_daemon(once, graceful_stop, executable, logger_prefix, partition_wait_time, sleep_time, run_once_fnc, activities=None, heart_beat_older_than=None):
def run_conveyor_daemon(once, graceful_stop, executable, logger_prefix, partition_wait_time, sleep_time, run_once_fnc, activities=None):

with HeartbeatHandler(executable=executable, logger_prefix=logger_prefix) as heartbeat_handler:
with HeartbeatHandler(executable=executable, renewal_interval=sleep_time - 1, logger_prefix=logger_prefix) as heartbeat_handler:
logger = heartbeat_handler.logger
logger(logging.INFO, 'started')

Expand Down Expand Up @@ -140,11 +145,11 @@ def run_conveyor_daemon(once, graceful_stop, executable, logger_prefix, partitio
else:
logger(logging.DEBUG, 'Starting next iteration')

heart_beat, logger = heartbeat_handler.live(older_than=heart_beat_older_than)
_, _, logger = heartbeat_handler.live()

must_sleep = True
try:
must_sleep = run_once_fnc(activity=activity, total_workers=heart_beat['nr_threads'], worker_number=heart_beat['assign_thread'], logger=logger)
must_sleep = run_once_fnc(activity=activity, heartbeat_handler=heartbeat_handler)
except Exception:
logger(logging.CRITICAL, "Exception", exc_info=True)
if once:
Expand Down
6 changes: 4 additions & 2 deletions lib/rucio/daemons/conveyor/finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@
region = make_region_memcached(expiration_time=3600)


def run_once(bulk, db_bulk, suspicious_patterns, retry_protocol_mismatches, total_workers, worker_number, logger, activity):
def run_once(bulk, db_bulk, suspicious_patterns, retry_protocol_mismatches, heartbeat_handler, activity):
worker_number, total_workers, logger = heartbeat_handler.live()

try:
logger(logging.DEBUG, 'Working on activity %s', activity)
time1 = time.time()
Expand All @@ -94,6 +96,7 @@ def run_once(bulk, db_bulk, suspicious_patterns, retry_protocol_mismatches, tota

for chunk in chunks(reqs, bulk):
try:
worker_number, total_workers, logger = heartbeat_handler.live()
time3 = time.time()
__handle_requests(chunk, suspicious_patterns, retry_protocol_mismatches, logger=logger)
record_timer('daemons.conveyor.finisher.handle_requests_time', (time.time() - time3) * 1000 / (len(chunk) if chunk else 1))
Expand Down Expand Up @@ -151,7 +154,6 @@ def finisher(once=False, sleep_time=60, activities=None, bulk=100, db_bulk=1000,
retry_protocol_mismatches=retry_protocol_mismatches,
),
activities=activities,
heart_beat_older_than=3600,
)


Expand Down
6 changes: 4 additions & 2 deletions lib/rucio/daemons/conveyor/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@
FILTER_TRANSFERTOOL = config_get('conveyor', 'filter_transfertool', False, None) # NOTE: TRANSFERTOOL to filter requests on


def run_once(fts_bulk, db_bulk, older_than, activity_shares, multi_vo, timeout, activity, total_workers, worker_number, logger):
def run_once(fts_bulk, db_bulk, older_than, activity_shares, multi_vo, timeout, activity, heartbeat_handler):
worker_number, total_workers, logger = heartbeat_handler.live()

start_time = time.time()
logger(logging.DEBUG, 'Start to poll transfers older than %i seconds for activity %s using transfer tool: %s' % (older_than, activity, FILTER_TRANSFERTOOL))
transfs = request_core.get_next(request_type=[RequestType.TRANSFER, RequestType.STAGEIN, RequestType.STAGEOUT],
Expand Down Expand Up @@ -103,6 +105,7 @@ def run_once(fts_bulk, db_bulk, older_than, activity_shares, multi_vo, timeout,

for chunk in dict_chunks(transfers_by_eid, fts_bulk):
try:
worker_number, total_workers, logger = heartbeat_handler.live()
poll_transfers(external_host=external_host, transfers_by_eid=chunk, vo=vo, timeout=timeout, logger=logger)
except Exception:
logger(logging.ERROR, 'Exception', exc_info=True)
Expand Down Expand Up @@ -155,7 +158,6 @@ def poller(once=False, activities=None, sleep_time=60,
timeout=timeout,
),
activities=activities,
heart_beat_older_than=3600,
)


Expand Down
10 changes: 8 additions & 2 deletions lib/rucio/daemons/conveyor/preparer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
if TYPE_CHECKING:
from typing import Optional
from sqlalchemy.orm import Session
from rucio.daemons.conveyor.common import HeartbeatHandler

graceful_stop = threading.Event()

Expand Down Expand Up @@ -103,11 +104,16 @@ def preparer(once, sleep_time, bulk, partition_wait_time=10):
bulk=bulk
),
activities=None,
heart_beat_older_than=None,
)


def run_once(bulk: int = 100, total_workers: int = 0, worker_number: int = 0, limit: "Optional[int]" = None, logger=logging.log, session: "Optional[Session]" = None, **kwargs) -> bool:
def run_once(bulk: int = 100, heartbeat_handler: "Optional[HeartbeatHandler]" = None, limit: "Optional[int]" = None, session: "Optional[Session]" = None, **kwargs) -> bool:
if heartbeat_handler:
worker_number, total_workers, logger = heartbeat_handler.live()
else:
# This is used in tests
worker_number, total_workers, logger = 0, 0, logging.log

start_time = time()
try:
req_sources = __list_transfer_requests_and_source_replicas(
Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/daemons/conveyor/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def receiver(id_, total_threads=1, full_mode=False, all_vos=False):

logging.info('receiver started')

with HeartbeatHandler(executable=executable, logger_prefix=logger_prefix) as heartbeat_handler:
with HeartbeatHandler(executable=executable, renewal_interval=30, logger_prefix=logger_prefix) as heartbeat_handler:

while not graceful_stop.is_set():

Expand Down
6 changes: 4 additions & 2 deletions lib/rucio/daemons/conveyor/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
graceful_stop = threading.Event()


def run_once(bulk, group_bulk, rse_ids, scheme, failover_scheme, transfertool_kwargs, total_workers, worker_number, logger, activity):
def run_once(bulk, group_bulk, rse_ids, scheme, failover_scheme, transfertool_kwargs, heartbeat_handler, activity):
worker_number, total_workers, logger = heartbeat_handler.live()

start_time = time.time()
transfers = transfer_core.next_transfers_to_submit(
total_workers=total_workers,
Expand Down Expand Up @@ -81,6 +83,7 @@ def run_once(bulk, group_bulk, rse_ids, scheme, failover_scheme, transfertool_kw

logger(logging.INFO, 'Starting to submit transfers for %s (%s)' % (activity, transfertool_obj))
for job in grouped_jobs:
worker_number, total_workers, logger = heartbeat_handler.live()
submit_transfer(transfertool_obj=transfertool_obj, transfers=job['transfers'], job_params=job['job_params'], submitter='transfer_submitter', logger=logger)

queue_empty = False
Expand Down Expand Up @@ -162,7 +165,6 @@ def stager(once=False, rses=None, bulk=100, group_bulk=1, group_policy='rule',
transfertool_kwargs=transfertool_kwargs,
),
activities=activities,
heart_beat_older_than=None,
)


Expand Down
5 changes: 3 additions & 2 deletions lib/rucio/daemons/conveyor/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@

def run_once(bulk, group_bulk, filter_transfertool, transfertool, ignore_availability, rse_ids,
scheme, failover_scheme, partition_hash_var, timeout, transfertool_kwargs,
total_workers, worker_number, logger, activity):
heartbeat_handler, activity):
worker_number, total_workers, logger = heartbeat_handler.live()

start_time = time.time()
transfers = transfer_core.next_transfers_to_submit(
Expand Down Expand Up @@ -112,6 +113,7 @@ def run_once(bulk, group_bulk, filter_transfertool, transfertool, ignore_availab

logger(logging.DEBUG, 'Starting to submit transfers for %s (%s)', activity, transfertool_obj)
for job in grouped_jobs:
worker_number, total_workers, logger = heartbeat_handler.live()
logger(logging.DEBUG, 'submitjob: transfers=%s, job_params=%s' % ([str(t) for t in job['transfers']], job['job_params']))
submit_transfer(transfertool_obj=transfertool_obj, transfers=job['transfers'], job_params=job['job_params'], submitter='transfer_submitter',
timeout=timeout, logger=logger)
Expand Down Expand Up @@ -218,7 +220,6 @@ def submitter(once=False, rses=None, partition_wait_time=10,
transfertool_kwargs=transfertool_kwargs,
),
activities=activities,
heart_beat_older_than=3600,
)


Expand Down
1 change: 0 additions & 1 deletion lib/rucio/daemons/conveyor/throttler.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def throttler(once=False, sleep_time=600, partition_wait_time=10):
sleep_time=sleep_time,
run_once_fnc=run_once,
activities=None,
heart_beat_older_than=3600,
)


Expand Down

0 comments on commit 6ead54d

Please sign in to comment.