Skip to content

Commit

Permalink
Monitoring & Logging: add datatype to deletion events. rucio#4557
Browse files Browse the repository at this point in the history
  • Loading branch information
rcarpa committed Jul 1, 2022
1 parent e389bf1 commit 962ce32
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
23 changes: 16 additions & 7 deletions lib/rucio/core/replica.py
Expand Up @@ -2564,6 +2564,7 @@ def list_and_mark_unlocked_replicas(limit, bytes_=None, rse_id=None, delay_secon
models.RSEFileAssociation.bytes,
models.RSEFileAssociation.tombstone,
models.RSEFileAssociation.state,
models.DataIdentifier.datatype,
).join_from(
temp_table_cls,
models.RSEFileAssociation,
Expand All @@ -2583,8 +2584,13 @@ def list_and_mark_unlocked_replicas(limit, bytes_=None, rse_id=None, delay_secon
models.Request,
and_(models.RSEFileAssociation.scope == models.Request.scope,
models.RSEFileAssociation.name == models.Request.name)
).join(
models.DataIdentifier,
and_(models.RSEFileAssociation.scope == models.DataIdentifier.scope,
models.RSEFileAssociation.name == models.DataIdentifier.name)
).group_by(
models.RSEFileAssociation
models.RSEFileAssociation,
models.DataIdentifier.datatype,
).having(
case([(func.count(replicas_alias.scope) > 0, True), # Can delete this replica if it's not the last replica
(func.count(models.Request.scope) == 0, True)], # If it's the last replica, only can delete if there are no requests using it
Expand All @@ -2595,15 +2601,15 @@ def list_and_mark_unlocked_replicas(limit, bytes_=None, rse_id=None, delay_secon
limit - len(rows)
)

for scope, name, path, bytes_, tombstone, state in session.execute(stmt):
for scope, name, path, bytes_, tombstone, state, datatype in session.execute(stmt):
if len(rows) >= limit or (needed_space is not None and total_bytes > needed_space):
break
if state != ReplicaState.UNAVAILABLE:
total_bytes += bytes_

rows.append({'scope': scope, 'name': name, 'path': path,
'bytes': bytes_, 'tombstone': tombstone,
'state': state})
'state': state, 'datatype': datatype})
if len(rows) >= limit or (needed_space is not None and total_bytes > needed_space):
break

Expand Down Expand Up @@ -2650,8 +2656,11 @@ def list_and_mark_unlocked_replicas_no_temp_table(limit, bytes_=None, rse_id=Non
models.RSEFileAssociation.path,
models.RSEFileAssociation.bytes,
models.RSEFileAssociation.tombstone,
models.RSEFileAssociation.state).\
models.RSEFileAssociation.state,
models.DataIdentifier.datatype).\
with_hint(models.RSEFileAssociation, "INDEX_RS_ASC(replicas REPLICAS_TOMBSTONE_IDX) NO_INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX)", 'oracle').\
join(models.DataIdentifier, and_(models.RSEFileAssociation.scope == models.DataIdentifier.scope,
models.RSEFileAssociation.name == models.DataIdentifier.name)).\
filter(models.RSEFileAssociation.tombstone < datetime.utcnow()).\
filter(models.RSEFileAssociation.lock_cnt == 0).\
filter(case([(models.RSEFileAssociation.tombstone != none_value, models.RSEFileAssociation.rse_id), ]) == rse_id).\
Expand All @@ -2668,7 +2677,7 @@ def list_and_mark_unlocked_replicas_no_temp_table(limit, bytes_=None, rse_id=Non
total_bytes, total_files = 0, 0
rows = []
replica_clause = []
for (scope, name, path, bytes_, tombstone, state) in query.yield_per(1000):
for (scope, name, path, bytes_, tombstone, state, datatype) in query.yield_per(1000):
# Check if more than one replica is available
replica_cnt = session.query(func.count(models.RSEFileAssociation.scope)).\
with_hint(models.RSEFileAssociation, "index(REPLICAS REPLICAS_PK)", 'oracle').\
Expand All @@ -2690,7 +2699,7 @@ def list_and_mark_unlocked_replicas_no_temp_table(limit, bytes_=None, rse_id=Non

rows.append({'scope': scope, 'name': name, 'path': path,
'bytes': bytes_, 'tombstone': tombstone,
'state': state})
'state': state, 'datatype': datatype})
replica_clause.append(and_(models.RSEFileAssociation.scope == scope,
models.RSEFileAssociation.name == name,
models.RSEFileAssociation.rse_id == rse_id))
Expand All @@ -2717,7 +2726,7 @@ def list_and_mark_unlocked_replicas_no_temp_table(limit, bytes_=None, rse_id=Non

rows.append({'scope': scope, 'name': name, 'path': path,
'bytes': bytes_, 'tombstone': tombstone,
'state': state})
'state': state, 'datatype': datatype})

replica_clause.append(and_(models.RSEFileAssociation.scope == scope,
models.RSEFileAssociation.name == name,
Expand Down
3 changes: 2 additions & 1 deletion lib/rucio/daemons/reaper/reaper.py
Expand Up @@ -150,7 +150,8 @@ def delete_from_storage(heartbeat_handler, hb_payload, replicas, prot, rse_info,
'file-size': replica['bytes'],
'bytes': replica['bytes'],
'url': replica['pfn'],
'protocol': prot.attributes['scheme']}
'protocol': prot.attributes['scheme'],
'datatype': replica['datatype']}
if replica['scope'].vo != 'def':
deletion_dict['vo'] = replica['scope'].vo
logger(logging.DEBUG, 'Deletion ATTEMPT of %s:%s as %s on %s', replica['scope'], replica['name'], replica['pfn'], rse_name)
Expand Down
9 changes: 7 additions & 2 deletions lib/rucio/tests/test_reaper.py
Expand Up @@ -26,6 +26,7 @@
from rucio.common.types import InternalAccount, InternalScope
from rucio.common.utils import generate_uuid
from rucio.core import did as did_core
from rucio.core import message as message_core
from rucio.core import replica as replica_core
from rucio.core import rse as rse_core
from rucio.core import rule as rule_core
Expand Down Expand Up @@ -62,7 +63,7 @@ def __add_test_rse_and_replicas(vo, scope, rse_name, names, file_size, epoch_tom
dids.append({'scope': scope, 'name': file_name})
replica_core.add_replica(rse_id=rse_id, scope=scope,
name=file_name, bytes_=file_size,
tombstone=tombstone,
tombstone=tombstone, meta={'datatype': 'SOME_DATATYPE'},
account=InternalAccount('root', vo=vo), adler32=None, md5=None)
return rse_name, rse_id, dids

Expand All @@ -75,7 +76,7 @@ def __add_test_rse_and_replicas(vo, scope, rse_name, names, file_size, epoch_tom
@pytest.mark.parametrize("caches_mock", [{"caches_to_mock": [
'rucio.daemons.reaper.reaper.REGION'
]}], indirect=True)
def test_reaper(vo, caches_mock, file_config_mock):
def test_reaper(vo, caches_mock, file_config_mock, message_mock):
""" REAPER (DAEMON): Test the reaper daemon."""
[cache_region] = caches_mock
scope = InternalScope('data13_hip', vo=vo)
Expand All @@ -101,6 +102,10 @@ def test_reaper(vo, caches_mock, file_config_mock):
reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None)
assert len(list(replica_core.list_replicas(dids, rse_expression=rse_name))) == 200

msgs = message_core.retrieve_messages()
assert len(msgs) == 50 # one for each deleted file
assert all(msg['payload']['datatype'] == 'SOME_DATATYPE' for msg in msgs)


@pytest.mark.parametrize("file_config_mock", [
# Run test twice: with, and without, temp tables
Expand Down

0 comments on commit 962ce32

Please sign in to comment.