Skip to content

Commit

Permalink
Monitoring & Logging: add datatype to transfer events. #4557
Browse files Browse the repository at this point in the history
The commit takes quite a dirty approach of retrieving the `datatype`
directly via a sql query when preparing the message. There is
no easy way to improve that:
- In all code path, there is a lot of logic between the moment
when we retrieve the work queue from the database and the moment
when we sent the message. Forwarding the datatype through all the
call stack will make the code more complicated.
- We cannot import from core.dids here, because it creates a circular
import problem. So using existing get_metadata calls is not easily
achievable to avoid a raw database call.
  • Loading branch information
rcarpa authored and bari12 committed Jul 1, 2022
1 parent 0b819cf commit 4e22e8a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
11 changes: 10 additions & 1 deletion lib/rucio/core/request.py
Expand Up @@ -1534,6 +1534,14 @@ def add_monitor_message(new_state, request, additional_fields, session=None):
transfer_status = 'transfer-%s' % new_state.name
transfer_status = transfer_status.lower()

stmt = select(
models.DataIdentifier.datatype
).where(
models.DataIdentifier.scope == request['scope'],
models.DataIdentifier.name == request['name'],
)
datatype = session.execute(stmt).scalar_one_or_none()

# Start by filling up fields from database request or with defaults.
message = {'activity': request.get('activity', None),
'request-id': request['id'],
Expand Down Expand Up @@ -1562,7 +1570,8 @@ def add_monitor_message(new_state, request, additional_fields, session=None):
'started_at': request.get('started_at', None),
'transferred_at': request.get('transferred_at', None),
'tool-id': 'rucio-conveyor',
'account': request.get('account', None)}
'account': request.get('account', None),
'datatype': datatype}

# Add (or override) existing fields
message.update(additional_fields)
Expand Down
13 changes: 11 additions & 2 deletions lib/rucio/core/transfer.py
Expand Up @@ -26,7 +26,7 @@

from dogpile.cache import make_region
from dogpile.cache.api import NoValue
from sqlalchemy import update
from sqlalchemy import select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql.expression import false

Expand Down Expand Up @@ -441,6 +441,14 @@ def set_transfers_state(transfers, state, submitted_at, external_host, external_
if rowcount == 0:
raise RucioException("%s: failed to set transfer state: request doesn't exist or is not in SUBMITTING state" % rws)

stmt = select(
models.DataIdentifier.datatype
).where(
models.DataIdentifier.scope == rws.scope,
models.DataIdentifier.name == rws.name,
)
datatype = session.execute(stmt).scalar_one_or_none()

msg = {'request-id': rws.request_id,
'request-type': rws.request_type,
'scope': rws.scope.external,
Expand All @@ -457,7 +465,8 @@ def set_transfers_state(transfers, state, submitted_at, external_host, external_
'checksum-adler': rws.adler32,
'external-id': external_id,
'external-host': external_host,
'queued_at': str(submitted_at)}
'queued_at': str(submitted_at),
'datatype': datatype}
if rws.scope.vo != 'def':
msg['vo'] = rws.scope.vo

Expand Down
4 changes: 4 additions & 0 deletions lib/rucio/tests/test_conveyor.py
Expand Up @@ -26,6 +26,7 @@
from rucio.common.utils import generate_uuid
from rucio.common.exception import ReplicaNotFound, RequestNotFound
from rucio.core import config as core_config
from rucio.core import did as did_core
from rucio.core import distance as distance_core
from rucio.core import message as message_core
from rucio.core import replica as replica_core
Expand Down Expand Up @@ -439,6 +440,7 @@ def test_multisource_receiver(vo, did_factory, replica_client, root_account, met
# Add non-existing replica which will fail during multisource transfers on the RSE with lower cost (will be the preferred source)
replica_client.add_replicas(rse=src_rse2, files=[{'scope': did['scope'].external, 'name': did['name'], 'bytes': 1, 'adler32': 'aaaaaaaa'}])

did_core.set_metadata(did['scope'], did['name'], 'datatype', 'RAW')
rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=dst_rse, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None)
submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], group_bulk=2, partition_wait_time=None, transfertype='single', filter_transfertool=None)

Expand Down Expand Up @@ -467,8 +469,10 @@ def test_multisource_receiver(vo, did_factory, replica_client, root_account, met
msgs = message_core.retrieve_messages()
msg_submitted = next(msg for msg in msgs if msg['event_type'] == 'transfer-submitted')
assert msg_submitted['payload']['request-id'] == request['id']
assert msg_submitted['payload']['datatype'] == 'RAW'
msg_done = next(msg for msg in msgs if msg['event_type'] == 'transfer-done')
assert msg_done['payload']['request-id'] == request['id']
assert msg_done['payload']['datatype'] == 'RAW'
finally:
receiver_graceful_stop.set()
receiver_thread.join(timeout=5)
Expand Down

0 comments on commit 4e22e8a

Please sign in to comment.