Skip to content

Commit

Permalink
Deletion: handle archives in reaper. #1431 #2213 (#4487)
Browse files Browse the repository at this point in the history
* Deletion: handle archives in reaper. #1431 #2213

Ensure that replicas are removed from datasets only if they don't exists
in an archive.

When removing an archive, handle the removal of its constituents.
Only if:
- they don't exist in another archive
- they don't have replicas outside any archive

This is achieved by recursively calling the cleanup function after
removing the archive (and thus the constituent replicas).

* Deletion: tolerate RSEs removal during rse-to-hostname mapping #1431

The case happens in tests now that we have concurrent tests which
create and remove RSEs. There is probably no harm to simply ignore the
RSEs which were removed between the list_rses call and the
get_rse_protocols call.
  • Loading branch information
rcarpa authored and bari12 committed Apr 22, 2021
1 parent baa08d2 commit 9ba3fe0
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 5 deletions.
28 changes: 27 additions & 1 deletion lib/rucio/core/replica.py
Expand Up @@ -1623,7 +1623,10 @@ def __cleanup_after_replica_deletion(rse_id, files, session=None):
models.DataIdentifier.availability == DIDAvailability.LOST)),
~exists(select([1]).prefix_with("/*+ INDEX(REPLICAS REPLICAS_PK) */", dialect='oracle')).where(
and_(models.RSEFileAssociation.scope == file['scope'],
models.RSEFileAssociation.name == file['name']))))
models.RSEFileAssociation.name == file['name'])),
~exists(select([1]).prefix_with("/*+ INDEX(ARCHIVE_CONTENTS ARCH_CONTENTS_PK) */", dialect='oracle')).where(
and_(models.ConstituentAssociation.child_scope == file['scope'],
models.ConstituentAssociation.child_name == file['name']))))

# 2) schedule removal of this file from the DID table
did_condition.append(
Expand Down Expand Up @@ -1793,11 +1796,34 @@ def __cleanup_after_replica_deletion(rse_id, files, session=None):
models.DidMeta.name == name))

# Remove Archive Constituents
removed_constituents = []
for chunk in chunks(archive_contents_condition, 30):
query = session.query(models.ConstituentAssociation). \
with_hint(models.ConstituentAssociation, "INDEX(ARCHIVE_CONTENTS ARCH_CONTENTS_CHILD_IDX)", 'oracle'). \
filter(or_(*chunk))
for constituent in query:
removed_constituents.append({'scope': constituent.child_scope, 'name': constituent.child_name})

models.ConstituentAssociationHistory(
child_scope=constituent.child_scope,
child_name=constituent.child_name,
scope=constituent.scope,
name=constituent.name,
bytes=constituent.bytes,
adler32=constituent.adler32,
md5=constituent.md5,
guid=constituent.guid,
length=constituent.length,
updated_at=constituent.updated_at,
created_at=constituent.created_at,
).save(session=session, flush=False)

session.query(models.ConstituentAssociation).\
with_hint(models.ConstituentAssociation, "INDEX(ARCHIVE_CONTENTS ARCH_CONTENTS_CHILD_IDX)", 'oracle').\
filter(or_(*chunk)).\
delete(synchronize_session=False)
for chunk in chunks(removed_constituents, 200):
__cleanup_after_replica_deletion(rse_id=rse_id, files=chunk, session=session)

# Remove rules in Waiting for approval or Suspended
for chunk in chunks(deleted_rules, 100):
Expand Down
7 changes: 6 additions & 1 deletion lib/rucio/daemons/reaper/reaper2.py
Expand Up @@ -246,7 +246,12 @@ def get_rses_to_hostname_mapping():
result = {}
all_rses = list_rses()
for rse in all_rses:
rse_protocol = get_rse_protocols(rse_id=rse['id'])
try:
rse_protocol = get_rse_protocols(rse_id=rse['id'])
except RSENotFound:
logging.log(logging.WARNING, 'RSE deleted while constructing rse-to-hostname mapping. Skipping %s', rse['rse'])
continue

for prot in rse_protocol['protocols']:
if prot['domains']['wan']['delete'] == 1:
result[rse['id']] = (prot['hostname'], rse_protocol)
Expand Down
6 changes: 6 additions & 0 deletions lib/rucio/tests/temp_factories.py
Expand Up @@ -228,6 +228,12 @@ def __cleanup_replicas(self, session=None):
models.BadReplicas.name == did['name'])
for did in self.created_dids)).delete(synchronize_session=False)

def register_dids(self, dids):
"""
Register the provided dids to be cleaned up on teardown
"""
self.created_dids.extend(dids)

def _sanitize_or_set_scope(self, scope):
if not scope:
scope = self.default_scope
Expand Down
139 changes: 136 additions & 3 deletions lib/rucio/tests/test_reaper2.py
Expand Up @@ -24,16 +24,22 @@

import pytest

from rucio.api import replica as replica_api
from rucio.api import rse as rse_api
from rucio.common.config import config_get_bool
from rucio.common.exception import ReplicaNotFound, DataIdentifierNotFound
from rucio.common.types import InternalAccount, InternalScope
from rucio.common.utils import generate_uuid
from rucio.api import rse as rse_api
from rucio.api import replica as replica_api
from rucio.core import did as did_core
from rucio.core import replica as replica_core
from rucio.core import rse as rse_core
from rucio.core import rule as rule_core
from rucio.core import scope as scope_core
from rucio.core import vo as vo_core
from rucio.daemons.reaper.reaper2 import reaper, REGION, run as run_reaper
from rucio.daemons.reaper.reaper2 import reaper, REGION
from rucio.daemons.reaper.reaper2 import run as run_reaper
from rucio.db.sqla.models import ConstituentAssociationHistory
from rucio.db.sqla.session import read_session
from rucio.tests.common import rse_name_generator

__mock_protocol = {'scheme': 'MOCK',
Expand Down Expand Up @@ -203,3 +209,130 @@ def test_reaper_multi_vo(vo):
reaper(once=True, rses=[], include_rses=both_rses, exclude_rses=None)
assert len(list(replica_core.list_replicas(dids=dids1, rse_expression=both_rses))) == 200
assert len(list(replica_core.list_replicas(dids=dids2, rse_expression=both_rses))) == 200


def test_archive_removal_impact_on_constituents(rse_factory, file_factory, mock_scope, root_account):
rse_name, rse_id = rse_factory.make_mock_rse()
scope = mock_scope
account = root_account

# Create an 2 archives and 4 files:
# - One only exists in the first archive
# - One in both, plus another replica, which is not in an archive
# - One in both, plus another replica, which is not in an archive; and this replica has expired
# - One in both, plus another replica, which is not in an archive; and this replica has expired; but a replication rule exists on this second replica
# Also add these files to datasets, one of which will be removed at the end
nb_constituents = 4
nb_c_outside_archive = nb_constituents - 1
constituent_size = 2000
archive_size = 1000
uuid = str(generate_uuid())
constituents = [{'scope': scope, 'name': 'lfn.%s.%d' % (uuid, i)} for i in range(nb_constituents)]
file_factory.register_dids(constituents)
c_first_archive_only, c_with_replica, c_with_expired_replica, c_with_replica_and_rule = constituents

replica_core.add_replica(rse_id=rse_id, account=account, bytes=constituent_size, **c_with_replica)

replica_core.add_replica(rse_id=rse_id, account=account, bytes=constituent_size,
tombstone=datetime.utcnow() - timedelta(days=1), **c_with_expired_replica)

replica_core.add_replica(rse_id=rse_id, account=account, bytes=constituent_size,
tombstone=datetime.utcnow() - timedelta(days=1), **c_with_replica_and_rule)
rule_core.add_rule(dids=[c_with_replica_and_rule], account=account, copies=1, rse_expression=rse_name, grouping='NONE',
weight=None, lifetime=None, locked=False, subscription_id=None)

archive1, archive2 = [{'scope': scope, 'name': 'archive_%s.%d.zip' % (uuid, i)} for i in range(2)]
replica_core.add_replica(rse_id=rse_id, bytes=archive_size, account=account, **archive1)
replica_core.add_replica(rse_id=rse_id, bytes=archive_size, account=account, **archive2)
did_core.attach_dids(dids=[{'scope': c['scope'], 'name': c['name'], 'bytes': constituent_size} for c in constituents],
account=account, **archive1)
did_core.attach_dids(dids=[{'scope': c['scope'], 'name': c['name'], 'bytes': constituent_size} for c in [c_with_replica, c_with_expired_replica, c_with_replica_and_rule]],
account=account, **archive2)

dataset1, dataset2 = [{'scope': scope, 'name': 'dataset_%s.%i' % (uuid, i)} for i in range(2)]
did_core.add_did(type='DATASET', account=account, **dataset1)
did_core.attach_dids(dids=constituents, account=account, **dataset1)
did_core.add_did(type='DATASET', account=account, **dataset2)
did_core.attach_dids(dids=[c_first_archive_only, c_with_expired_replica], account=account, **dataset2)

@read_session
def __get_archive_contents_history_count(archive, session=None):
return session.query(ConstituentAssociationHistory).filter_by(**archive).count()

# Run reaper the first time.
# the expired non-archive replica of c_with_expired_replica must be removed,
# but the did must not be remove and it must still remain in the dataset because
# it still has the replica from inside the archive
assert replica_core.get_replica(rse_id=rse_id, **c_with_expired_replica)
REGION.invalidate()
rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=2 * archive_size + nb_c_outside_archive * constituent_size)
rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=2 * archive_size + nb_c_outside_archive * constituent_size, free=1)
reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None)
for did in constituents + [archive1, archive2]:
assert did_core.get_did(**did)
for did in [archive1, archive2, c_with_replica, c_with_replica_and_rule]:
assert replica_core.get_replica(rse_id=rse_id, **did)
with pytest.raises(ReplicaNotFound):
# The replica is only on the archive, not on the constituent
replica_core.get_replica(rse_id=rse_id, **c_first_archive_only)
with pytest.raises(ReplicaNotFound):
# The replica outside the archive was removed by reaper
nb_c_outside_archive -= 1
replica_core.get_replica(rse_id=rse_id, **c_with_expired_replica)
# Compared to get_replica, list_replicas resolves archives, must return replicas for all files
assert len(list(replica_core.list_replicas(dids=constituents))) == 4
assert len(list(did_core.list_content(**dataset1))) == 4
assert len(list(did_core.list_archive_content(**archive1))) == 4
assert len(list(did_core.list_archive_content(**archive2))) == 3
assert __get_archive_contents_history_count(archive1) == 0
assert __get_archive_contents_history_count(archive2) == 0

# Expire the first archive and run reaper again
# the archive will be removed; and c_first_archive_only must be removed from datasets
# and from the did table.
replica_core.set_tombstone(rse_id=rse_id, tombstone=datetime.utcnow() - timedelta(days=1), **archive1)
REGION.invalidate()
rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=2 * archive_size + nb_c_outside_archive * constituent_size)
rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=2 * archive_size + nb_c_outside_archive * constituent_size, free=1)
reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None)
with pytest.raises(DataIdentifierNotFound):
assert did_core.get_did(**archive1)
with pytest.raises(DataIdentifierNotFound):
assert did_core.get_did(**c_first_archive_only)
assert len(list(replica_core.list_replicas(dids=constituents))) == 3
assert len(list(did_core.list_content(**dataset1))) == 3
assert len(list(did_core.list_archive_content(**archive1))) == 0
assert len(list(did_core.list_archive_content(**archive2))) == 3
assert __get_archive_contents_history_count(archive1) == 4
assert __get_archive_contents_history_count(archive2) == 0

# Expire the second archive replica and run reaper another time
# c_with_expired_replica is removed because its external replica got removed at previous step
# and it exist only inside the archive now.
# If not open, Dataset2 will be removed because it will be empty.
did_core.set_status(open=False, **dataset2)
replica_core.set_tombstone(rse_id=rse_id, tombstone=datetime.utcnow() - timedelta(days=1), **archive2)
REGION.invalidate()
rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=archive_size + nb_c_outside_archive * constituent_size)
rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=archive_size + nb_c_outside_archive * constituent_size, free=1)
reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None)
# The archive must be removed
with pytest.raises(DataIdentifierNotFound):
assert did_core.get_did(**archive2)
# The DIDs which only existed in the archive are also removed
with pytest.raises(DataIdentifierNotFound):
assert did_core.get_did(**c_first_archive_only)
with pytest.raises(DataIdentifierNotFound):
assert did_core.get_did(**c_with_expired_replica)
# If the DID has a non-expired replica outside the archive without rules on it, the DID is not removed
assert did_core.get_did(**c_with_replica)
# If the DID has an expired replica outside the archive, but has rules on that replica, the DID is not removed
assert did_core.get_did(**c_with_replica_and_rule)
assert len(list(replica_core.list_replicas(dids=constituents))) == 2
assert len(list(did_core.list_content(**dataset1))) == 2
with pytest.raises(DataIdentifierNotFound):
did_core.get_did(**dataset2)
assert len(list(did_core.list_content(**dataset2))) == 0
assert len(list(did_core.list_archive_content(**archive2))) == 0
assert __get_archive_contents_history_count(archive1) == 4
assert __get_archive_contents_history_count(archive2) == 3

0 comments on commit 9ba3fe0

Please sign in to comment.