Skip to content

Commit

Permalink
Refactor Abacus Collection Replica to inherit from base Daemon class; r…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdimaio committed Feb 6, 2024
1 parent 497f1ba commit 0f14a5a
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 101 deletions.
10 changes: 5 additions & 5 deletions bin/rucio-abacus-collection-replica
Expand Up @@ -21,7 +21,7 @@ Abacus collection replica is a daemon to update collection replicas.
import argparse
import signal

from rucio.daemons.abacus.collection_replica import run, stop
from rucio.daemons.abacus.collection_replica import AbacusCollectionReplica


def get_parser():
Expand All @@ -37,11 +37,11 @@ def get_parser():


if __name__ == "__main__":

signal.signal(signal.SIGTERM, stop)
parser = get_parser()
args = parser.parse_args()
abacus_collection_replica = AbacusCollectionReplica(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time, limit=args.limit)
signal.signal(signal.SIGTERM, abacus_collection_replica.stop)
try:
run(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time, limit=args.limit)
abacus_collection_replica.run()
except KeyboardInterrupt:
stop()
abacus_collection_replica.stop()
4 changes: 2 additions & 2 deletions lib/rucio/core/replica.py
Expand Up @@ -3040,12 +3040,12 @@ def list_datasets_per_rse(rse_id, filters=None, limit=None, *, session: "Session


@transactional_session
def get_cleaned_updated_collection_replicas(total_workers, worker_number, limit=None, *, session: "Session"):
def get_cleaned_updated_collection_replicas(total_workers: int, worker_number: int, limit: int = None, *, session: "Session") -> list[dict]:
"""
Get update request for collection replicas.
:param total_workers: Number of total workers.
:param worker_number: id of the executing worker.
:param limit: Maximum numberws to return.
:param limit: Maximum numbers to return.
:param session: Database session in use.
:returns: List of update requests for collection replicas.
"""
Expand Down
124 changes: 37 additions & 87 deletions lib/rucio/daemons/abacus/collection_replica.py
Expand Up @@ -16,98 +16,48 @@
"""
Abacus-Collection-Replica is a daemon to update collection replica.
"""
import functools
import logging
import threading
import time
from typing import TYPE_CHECKING
from typing import Any

import rucio.db.sqla.util
from rucio.common import exception
from rucio.common.logging import setup_logging
from rucio.core.replica import get_cleaned_updated_collection_replicas, update_collection_replica
from rucio.daemons.common import run_daemon
from rucio.daemons.common import Daemon, HeartbeatHandler

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

graceful_stop = threading.Event()
DAEMON_NAME = 'abacus-collection-replica'
class AbacusCollectionReplica(Daemon):
def __init__(self, limit: int = 1000, **_kwargs) -> None:
"""
:param limit: Amount of collection replicas to retrieve per chunk.
"""
super().__init__(daemon_name="abacus-collection-replica", **_kwargs)
self.limit = limit


def collection_replica_update(once=False, limit=1000, sleep_time=10):
"""
Main loop to check and update the collection replicas.
"""
run_daemon(
once=once,
graceful_stop=graceful_stop,
executable=DAEMON_NAME,
partition_wait_time=1,
sleep_time=sleep_time,
run_once_fnc=functools.partial(
run_once,
limit=limit,
),
)


def run_once(heartbeat_handler, limit, **_kwargs):
worker_number, total_workers, logger = heartbeat_handler.live()
# Select a bunch of collection replicas for to update for this worker
start = time.time() # NOQA
replicas = get_cleaned_updated_collection_replicas(total_workers=total_workers - 1,
worker_number=worker_number,
limit=limit)

logger(logging.DEBUG, 'Index query time %f size=%d' % (time.time() - start, len(replicas)))
# If the list is empty, sent the worker to sleep
if not replicas:
logger(logging.INFO, 'did not get any work')
must_sleep = True
return must_sleep

for replica in replicas:
def _run_once(self, heartbeat_handler: "HeartbeatHandler", **_kwargs) -> tuple[bool, Any]:
worker_number, total_workers, logger = heartbeat_handler.live()
if graceful_stop.is_set():
break
start_time = time.time()
update_collection_replica(replica)
logger(logging.DEBUG, 'update of collection replica "%s" took %f' % (replica['id'], time.time() - start_time))

must_sleep = False
if limit and len(replicas) < limit:
must_sleep = True
return must_sleep


def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""

graceful_stop.set()


def run(once=False, threads=1, sleep_time=10, limit=1000):
"""
Starts up the Abacus-Collection-Replica threads.
"""
setup_logging(process_name=DAEMON_NAME)

if rucio.db.sqla.util.is_old_db():
raise exception.DatabaseException('Database was not updated, daemon won\'t start')

if once:
logging.info('main: executing one iteration only')
collection_replica_update(once)
else:
logging.info('main: starting threads')
threads = [threading.Thread(target=collection_replica_update, kwargs={'once': once, 'sleep_time': sleep_time, 'limit': limit})
for _ in range(0, threads)]
[t.start() for t in threads]
logging.info('main: waiting for interrupts')
# Interruptible joins require a timeout.
while threads[0].is_alive():
[t.join(timeout=3.14) for t in threads]
must_sleep = False

# Select a bunch of collection replicas for to update for this worker
start = time.time() # NOQA
replicas = get_cleaned_updated_collection_replicas(total_workers=total_workers - 1,
worker_number=worker_number,
limit=self.limit)

logger(logging.DEBUG, 'Index query time %f size=%d' % (time.time() - start, len(replicas)))
# If the list is empty, sent the worker to sleep
if not replicas:
logger(logging.INFO, 'did not get any work')
must_sleep = True
return must_sleep, None

for replica in replicas:
worker_number, total_workers, logger = heartbeat_handler.live()
if self.graceful_stop.is_set():
break
start_time = time.time()
update_collection_replica(replica)
logger(logging.DEBUG, 'update of collection replica "%s" took %f' % (replica['id'], time.time() - start_time))

if self.limit and len(replicas) < self.limit:
must_sleep = True

return must_sleep, None
14 changes: 7 additions & 7 deletions tests/test_abacus_collection_replica.py
Expand Up @@ -21,7 +21,7 @@
from rucio.common.types import InternalAccount
from rucio.core.did import add_did, get_did
from rucio.core.replica import delete_replicas, get_cleaned_updated_collection_replicas
from rucio.daemons.abacus import collection_replica
from rucio.daemons.abacus.collection_replica import AbacusCollectionReplica
from rucio.daemons.judge import cleaner
import rucio.daemons.reaper.reaper
from rucio.daemons.reaper.reaper import Reaper
Expand All @@ -36,7 +36,7 @@ class TestAbacusCollectionReplica():

def test_abacus_collection_replica_cleanup(self, vo, mock_scope, rse_factory, did_client):
""" ABACUS (COLLECTION REPLICA): Test if the cleanup procedure works correctly. """
collection_replica.run(once=True)
AbacusCollectionReplica(once=True).run()
db_session = session.get_session()
rse1, rse_id1 = rse_factory.make_rse()
rse2, rse_id2 = rse_factory.make_rse()
Expand Down Expand Up @@ -91,7 +91,7 @@ def test_abacus_collection_replica(self, vo, mock_scope, rse_factory, did_factor
assert str(dataset_replica['state']) == 'UNAVAILABLE'

# Run Abacus
collection_replica.run(once=True)
AbacusCollectionReplica(once=True).run()

# Check dataset replica after abacus - abacus should update the collection_replica table from updated_col_rep
dataset_replica = [replica for replica in rucio_client.list_dataset_replicas(mock_scope.external, dataset)][0]
Expand All @@ -106,7 +106,7 @@ def test_abacus_collection_replica(self, vo, mock_scope, rse_factory, did_factor
delete_replicas(rse_id=rse_id, files=[{'name': files[0]['name'], 'scope': mock_scope}])
activity = get_schema_value('ACTIVITY')['enum'][0]
rucio_client.add_replication_rule([{'scope': mock_scope.external, 'name': dataset}], 1, rse, lifetime=-1, activity=activity)
collection_replica.run(once=True)
AbacusCollectionReplica(once=True).run()
dataset_replica = [replica for replica in rucio_client.list_dataset_replicas(mock_scope.external, dataset)][0]
assert dataset_replica['length'] == len(files)
assert dataset_replica['bytes'] == len(files) * file_sizes
Expand All @@ -126,7 +126,7 @@ def test_abacus_collection_replica(self, vo, mock_scope, rse_factory, did_factor
reaper.run()
activity = get_schema_value('ACTIVITY')['enum'][0]
rucio_client.add_replication_rule([{'scope': mock_scope.external, 'name': dataset}], 1, rse, lifetime=-1, activity=activity)
collection_replica.run(once=True)
AbacusCollectionReplica(once=True).run()
dataset_replica = [replica for replica in rucio_client.list_dataset_replicas(mock_scope.external, dataset)]
assert dataset_replica[0]['length'] == 0
assert dataset_replica[0]['available_length'] == 0
Expand Down Expand Up @@ -159,7 +159,7 @@ def test_abacus_collection_replica_new(self, vo, mock_scope, rse_factory, rucio_
assert str(dataset_replica['state']) == 'UNAVAILABLE'

# Run Abacus
collection_replica.run(once=True)
AbacusCollectionReplica(once=True).run()

# Check dataset replica after abacus - abacus should update the collection_replica table from updated_col_rep
dataset_replica = [replica for replica in rucio_client.list_dataset_replicas(mock_scope.external, dataset)][0]
Expand All @@ -174,7 +174,7 @@ def test_abacus_collection_replica_new(self, vo, mock_scope, rse_factory, rucio_
delete_replicas(rse_id=rse_id, files=[{'name': files[0]['name'], 'scope': mock_scope}])
activity = get_schema_value('ACTIVITY')['enum'][0]
rucio_client.add_replication_rule([{'scope': mock_scope.external, 'name': dataset}], 1, rse, lifetime=-1, activity=activity)
collection_replica.run(once=True)
AbacusCollectionReplica(once=True).run()
dataset_replica = [replica for replica in rucio_client.list_dataset_replicas(mock_scope.external, dataset)][0]
assert dataset_replica['length'] == len(files)
assert dataset_replica['bytes'] == len(files) * file_sizes
Expand Down

0 comments on commit 0f14a5a

Please sign in to comment.