diff --git a/lib/rucio/core/replica.py b/lib/rucio/core/replica.py index 12cda12ebb..607b2c7d2a 100644 --- a/lib/rucio/core/replica.py +++ b/lib/rucio/core/replica.py @@ -1623,7 +1623,10 @@ def __cleanup_after_replica_deletion(rse_id, files, session=None): models.DataIdentifier.availability == DIDAvailability.LOST)), ~exists(select([1]).prefix_with("/*+ INDEX(REPLICAS REPLICAS_PK) */", dialect='oracle')).where( and_(models.RSEFileAssociation.scope == file['scope'], - models.RSEFileAssociation.name == file['name'])))) + models.RSEFileAssociation.name == file['name'])), + ~exists(select([1]).prefix_with("/*+ INDEX(ARCHIVE_CONTENTS ARCH_CONTENTS_PK) */", dialect='oracle')).where( + and_(models.ConstituentAssociation.child_scope == file['scope'], + models.ConstituentAssociation.child_name == file['name'])))) # 2) schedule removal of this file from the DID table did_condition.append( @@ -1793,11 +1796,34 @@ def __cleanup_after_replica_deletion(rse_id, files, session=None): models.DidMeta.name == name)) # Remove Archive Constituents + removed_constituents = [] for chunk in chunks(archive_contents_condition, 30): + query = session.query(models.ConstituentAssociation). \ + with_hint(models.ConstituentAssociation, "INDEX(ARCHIVE_CONTENTS ARCH_CONTENTS_CHILD_IDX)", 'oracle'). \ + filter(or_(*chunk)) + for constituent in query: + removed_constituents.append({'scope': constituent.child_scope, 'name': constituent.child_name}) + + models.ConstituentAssociationHistory( + child_scope=constituent.child_scope, + child_name=constituent.child_name, + scope=constituent.scope, + name=constituent.name, + bytes=constituent.bytes, + adler32=constituent.adler32, + md5=constituent.md5, + guid=constituent.guid, + length=constituent.length, + updated_at=constituent.updated_at, + created_at=constituent.created_at, + ).save(session=session, flush=False) + session.query(models.ConstituentAssociation).\ with_hint(models.ConstituentAssociation, "INDEX(ARCHIVE_CONTENTS ARCH_CONTENTS_CHILD_IDX)", 'oracle').\ filter(or_(*chunk)).\ delete(synchronize_session=False) + for chunk in chunks(removed_constituents, 200): + __cleanup_after_replica_deletion(rse_id=rse_id, files=chunk, session=session) # Remove rules in Waiting for approval or Suspended for chunk in chunks(deleted_rules, 100): diff --git a/lib/rucio/daemons/reaper/reaper2.py b/lib/rucio/daemons/reaper/reaper2.py index 57e3301628..4d58d6eacb 100644 --- a/lib/rucio/daemons/reaper/reaper2.py +++ b/lib/rucio/daemons/reaper/reaper2.py @@ -246,7 +246,12 @@ def get_rses_to_hostname_mapping(): result = {} all_rses = list_rses() for rse in all_rses: - rse_protocol = get_rse_protocols(rse_id=rse['id']) + try: + rse_protocol = get_rse_protocols(rse_id=rse['id']) + except RSENotFound: + logging.log(logging.WARNING, 'RSE deleted while constructing rse-to-hostname mapping. Skipping %s', rse['rse']) + continue + for prot in rse_protocol['protocols']: if prot['domains']['wan']['delete'] == 1: result[rse['id']] = (prot['hostname'], rse_protocol) diff --git a/lib/rucio/tests/temp_factories.py b/lib/rucio/tests/temp_factories.py index 4c861f1bac..1d13e4b830 100644 --- a/lib/rucio/tests/temp_factories.py +++ b/lib/rucio/tests/temp_factories.py @@ -228,6 +228,12 @@ def __cleanup_replicas(self, session=None): models.BadReplicas.name == did['name']) for did in self.created_dids)).delete(synchronize_session=False) + def register_dids(self, dids): + """ + Register the provided dids to be cleaned up on teardown + """ + self.created_dids.extend(dids) + def _sanitize_or_set_scope(self, scope): if not scope: scope = self.default_scope diff --git a/lib/rucio/tests/test_reaper2.py b/lib/rucio/tests/test_reaper2.py index e5cf4da010..18fb4e6a14 100644 --- a/lib/rucio/tests/test_reaper2.py +++ b/lib/rucio/tests/test_reaper2.py @@ -24,16 +24,22 @@ import pytest +from rucio.api import replica as replica_api +from rucio.api import rse as rse_api from rucio.common.config import config_get_bool +from rucio.common.exception import ReplicaNotFound, DataIdentifierNotFound from rucio.common.types import InternalAccount, InternalScope from rucio.common.utils import generate_uuid -from rucio.api import rse as rse_api -from rucio.api import replica as replica_api +from rucio.core import did as did_core from rucio.core import replica as replica_core from rucio.core import rse as rse_core +from rucio.core import rule as rule_core from rucio.core import scope as scope_core from rucio.core import vo as vo_core -from rucio.daemons.reaper.reaper2 import reaper, REGION, run as run_reaper +from rucio.daemons.reaper.reaper2 import reaper, REGION +from rucio.daemons.reaper.reaper2 import run as run_reaper +from rucio.db.sqla.models import ConstituentAssociationHistory +from rucio.db.sqla.session import read_session from rucio.tests.common import rse_name_generator __mock_protocol = {'scheme': 'MOCK', @@ -203,3 +209,130 @@ def test_reaper_multi_vo(vo): reaper(once=True, rses=[], include_rses=both_rses, exclude_rses=None) assert len(list(replica_core.list_replicas(dids=dids1, rse_expression=both_rses))) == 200 assert len(list(replica_core.list_replicas(dids=dids2, rse_expression=both_rses))) == 200 + + +def test_archive_removal_impact_on_constituents(rse_factory, file_factory, mock_scope, root_account): + rse_name, rse_id = rse_factory.make_mock_rse() + scope = mock_scope + account = root_account + + # Create an 2 archives and 4 files: + # - One only exists in the first archive + # - One in both, plus another replica, which is not in an archive + # - One in both, plus another replica, which is not in an archive; and this replica has expired + # - One in both, plus another replica, which is not in an archive; and this replica has expired; but a replication rule exists on this second replica + # Also add these files to datasets, one of which will be removed at the end + nb_constituents = 4 + nb_c_outside_archive = nb_constituents - 1 + constituent_size = 2000 + archive_size = 1000 + uuid = str(generate_uuid()) + constituents = [{'scope': scope, 'name': 'lfn.%s.%d' % (uuid, i)} for i in range(nb_constituents)] + file_factory.register_dids(constituents) + c_first_archive_only, c_with_replica, c_with_expired_replica, c_with_replica_and_rule = constituents + + replica_core.add_replica(rse_id=rse_id, account=account, bytes=constituent_size, **c_with_replica) + + replica_core.add_replica(rse_id=rse_id, account=account, bytes=constituent_size, + tombstone=datetime.utcnow() - timedelta(days=1), **c_with_expired_replica) + + replica_core.add_replica(rse_id=rse_id, account=account, bytes=constituent_size, + tombstone=datetime.utcnow() - timedelta(days=1), **c_with_replica_and_rule) + rule_core.add_rule(dids=[c_with_replica_and_rule], account=account, copies=1, rse_expression=rse_name, grouping='NONE', + weight=None, lifetime=None, locked=False, subscription_id=None) + + archive1, archive2 = [{'scope': scope, 'name': 'archive_%s.%d.zip' % (uuid, i)} for i in range(2)] + replica_core.add_replica(rse_id=rse_id, bytes=archive_size, account=account, **archive1) + replica_core.add_replica(rse_id=rse_id, bytes=archive_size, account=account, **archive2) + did_core.attach_dids(dids=[{'scope': c['scope'], 'name': c['name'], 'bytes': constituent_size} for c in constituents], + account=account, **archive1) + did_core.attach_dids(dids=[{'scope': c['scope'], 'name': c['name'], 'bytes': constituent_size} for c in [c_with_replica, c_with_expired_replica, c_with_replica_and_rule]], + account=account, **archive2) + + dataset1, dataset2 = [{'scope': scope, 'name': 'dataset_%s.%i' % (uuid, i)} for i in range(2)] + did_core.add_did(type='DATASET', account=account, **dataset1) + did_core.attach_dids(dids=constituents, account=account, **dataset1) + did_core.add_did(type='DATASET', account=account, **dataset2) + did_core.attach_dids(dids=[c_first_archive_only, c_with_expired_replica], account=account, **dataset2) + + @read_session + def __get_archive_contents_history_count(archive, session=None): + return session.query(ConstituentAssociationHistory).filter_by(**archive).count() + + # Run reaper the first time. + # the expired non-archive replica of c_with_expired_replica must be removed, + # but the did must not be remove and it must still remain in the dataset because + # it still has the replica from inside the archive + assert replica_core.get_replica(rse_id=rse_id, **c_with_expired_replica) + REGION.invalidate() + rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=2 * archive_size + nb_c_outside_archive * constituent_size) + rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=2 * archive_size + nb_c_outside_archive * constituent_size, free=1) + reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None) + for did in constituents + [archive1, archive2]: + assert did_core.get_did(**did) + for did in [archive1, archive2, c_with_replica, c_with_replica_and_rule]: + assert replica_core.get_replica(rse_id=rse_id, **did) + with pytest.raises(ReplicaNotFound): + # The replica is only on the archive, not on the constituent + replica_core.get_replica(rse_id=rse_id, **c_first_archive_only) + with pytest.raises(ReplicaNotFound): + # The replica outside the archive was removed by reaper + nb_c_outside_archive -= 1 + replica_core.get_replica(rse_id=rse_id, **c_with_expired_replica) + # Compared to get_replica, list_replicas resolves archives, must return replicas for all files + assert len(list(replica_core.list_replicas(dids=constituents))) == 4 + assert len(list(did_core.list_content(**dataset1))) == 4 + assert len(list(did_core.list_archive_content(**archive1))) == 4 + assert len(list(did_core.list_archive_content(**archive2))) == 3 + assert __get_archive_contents_history_count(archive1) == 0 + assert __get_archive_contents_history_count(archive2) == 0 + + # Expire the first archive and run reaper again + # the archive will be removed; and c_first_archive_only must be removed from datasets + # and from the did table. + replica_core.set_tombstone(rse_id=rse_id, tombstone=datetime.utcnow() - timedelta(days=1), **archive1) + REGION.invalidate() + rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=2 * archive_size + nb_c_outside_archive * constituent_size) + rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=2 * archive_size + nb_c_outside_archive * constituent_size, free=1) + reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None) + with pytest.raises(DataIdentifierNotFound): + assert did_core.get_did(**archive1) + with pytest.raises(DataIdentifierNotFound): + assert did_core.get_did(**c_first_archive_only) + assert len(list(replica_core.list_replicas(dids=constituents))) == 3 + assert len(list(did_core.list_content(**dataset1))) == 3 + assert len(list(did_core.list_archive_content(**archive1))) == 0 + assert len(list(did_core.list_archive_content(**archive2))) == 3 + assert __get_archive_contents_history_count(archive1) == 4 + assert __get_archive_contents_history_count(archive2) == 0 + + # Expire the second archive replica and run reaper another time + # c_with_expired_replica is removed because its external replica got removed at previous step + # and it exist only inside the archive now. + # If not open, Dataset2 will be removed because it will be empty. + did_core.set_status(open=False, **dataset2) + replica_core.set_tombstone(rse_id=rse_id, tombstone=datetime.utcnow() - timedelta(days=1), **archive2) + REGION.invalidate() + rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=archive_size + nb_c_outside_archive * constituent_size) + rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=archive_size + nb_c_outside_archive * constituent_size, free=1) + reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None) + # The archive must be removed + with pytest.raises(DataIdentifierNotFound): + assert did_core.get_did(**archive2) + # The DIDs which only existed in the archive are also removed + with pytest.raises(DataIdentifierNotFound): + assert did_core.get_did(**c_first_archive_only) + with pytest.raises(DataIdentifierNotFound): + assert did_core.get_did(**c_with_expired_replica) + # If the DID has a non-expired replica outside the archive without rules on it, the DID is not removed + assert did_core.get_did(**c_with_replica) + # If the DID has an expired replica outside the archive, but has rules on that replica, the DID is not removed + assert did_core.get_did(**c_with_replica_and_rule) + assert len(list(replica_core.list_replicas(dids=constituents))) == 2 + assert len(list(did_core.list_content(**dataset1))) == 2 + with pytest.raises(DataIdentifierNotFound): + did_core.get_did(**dataset2) + assert len(list(did_core.list_content(**dataset2))) == 0 + assert len(list(did_core.list_archive_content(**archive2))) == 0 + assert __get_archive_contents_history_count(archive1) == 4 + assert __get_archive_contents_history_count(archive2) == 3