Skip to content

Commit

Permalink
Transfers: remove useless mock argument in submitter. rucio#4491
Browse files Browse the repository at this point in the history
Tests work correctly without this argument.
Is it used by somebody in production? why would it be?
  • Loading branch information
rcarpa committed Apr 27, 2021
1 parent 6a1049f commit 56144b5
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 30 deletions.
34 changes: 5 additions & 29 deletions lib/rucio/daemons/conveyor/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
GET_TRANSFERS_COUNTER = Counter('rucio_daemons_conveyor_submitter_get_transfers', 'Number of transfers retrieved')


def submitter(once=False, rses=None, mock=False,
def submitter(once=False, rses=None, partition_wait_time=10,
bulk=100, group_bulk=1, group_policy='rule', source_strategy=None,
activities=None, sleep_time=600, max_sources=4, retry_other_fts=False,
filter_transfertool=FILTER_TRANSFERTOOL, transfertool=TRANSFER_TOOL, transfertype=TRANSFER_TYPE):
Expand Down Expand Up @@ -133,8 +133,8 @@ def submitter(once=False, rses=None, mock=False,
logger = formatted_logger(logging.log, prefix + '%s')
logger(logging.INFO, 'Submitter starting with timeout %s', timeout)

if not mock:
time.sleep(10) # To prevent running on the same partition if all the poller restart at the same time
if partition_wait_time:
time.sleep(partition_wait_time) # To prevent running on the same partition if all the poller restart at the same time
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread)
prefix = 'conveyor-submitter[%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
logger = formatted_logger(logging.log, prefix + '%s')
Expand Down Expand Up @@ -172,7 +172,6 @@ def submitter(once=False, rses=None, mock=False,
activity=activity,
rses=rse_ids,
schemes=scheme,
mock=mock,
max_sources=max_sources,
bring_online=bring_online,
retry_other_fts=retry_other_fts,
Expand Down Expand Up @@ -267,9 +266,6 @@ def run(once=False, group_bulk=1, group_policy='rule', mock=False,
if rucio.db.sqla.util.is_old_db():
raise exception.DatabaseException('Database was not updated, daemon won\'t start')

if mock:
logging.info('mock source replicas: enabled')

multi_vo = config_get_bool('common', 'multi_vo', raise_exception=False, default=False)
working_rses = None
if rses or include_rses or exclude_rses:
Expand Down Expand Up @@ -306,7 +302,6 @@ def run(once=False, group_bulk=1, group_policy='rule', mock=False,
'group_bulk': group_bulk,
'group_policy': group_policy,
'activities': activities,
'mock': mock,
'sleep_time': sleep_time,
'max_sources': max_sources,
'source_strategy': source_strategy,
Expand All @@ -322,7 +317,7 @@ def run(once=False, group_bulk=1, group_policy='rule', mock=False,


def __get_transfers(total_workers=0, worker_number=0, failover_schemes=None, limit=None, activity=None, older_than=None,
rses=None, schemes=None, mock=False, max_sources=4, bring_online=43200,
rses=None, schemes=None, max_sources=4, bring_online=43200,
retry_other_fts=False, transfertool=None, logger=logging.log):
"""
Get transfers to process
Expand All @@ -335,7 +330,6 @@ def __get_transfers(total_workers=0, worker_number=0, failover_schemes=None, lim
:param older_than: Only select requests older than this DateTime.
:param rses: List of rse_id to select requests.
:param schemes: Schemes to process.
:param mock: Mock testing.
:param max_sources: Max sources.
:param bring_online: Bring online timeout.
:param logger: Optional decorated logger that can be passed from the calling daemons or servers.
Expand Down Expand Up @@ -364,10 +358,7 @@ def __get_transfers(total_workers=0, worker_number=0, failover_schemes=None, lim
sources = __sort_ranking(sources, logger=logger)
if len(sources) > max_sources:
sources = sources[:max_sources]
if not mock:
transfers[request_id]['sources'] = sources
else:
transfers[request_id]['sources'] = __mock_sources(sources)
transfers[request_id]['sources'] = sources

# remove link_ranking in the final sources
sources = transfers[request_id]['sources']
Expand Down Expand Up @@ -434,18 +425,3 @@ def __sort_ranking(sources, logger=logging.log):
ret_sources = ret_sources + sources_list
logger(logging.DEBUG, "Sources after sorting: %s", str(ret_sources))
return ret_sources


def __mock_sources(sources):
"""
Create mock sources
:param sources: List of sources
:return: List of mock sources
"""

tmp_sources = []
for source in sources:
tmp_sources.append((source[0], ':'.join(['mock'] + source[1].split(':')[1:]), source[2], source[3], source[4]))
sources = tmp_sources
return tmp_sources
2 changes: 1 addition & 1 deletion lib/rucio/tests/test_conveyor_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _forge_requests_creation_time(session=None):
# Record the order of requests passed to MockTranfertool.submit()
mock_transfertool_submit.side_effect = lambda jobs, _: requests_id_in_submission_order.extend([j['metadata']['request_id'] for j in jobs])

submitter(once=True, rses=[{'id': rse_id} for _, rse_id in dst_rses], mock=True, transfertool='mock', transfertype='single', filter_transfertool=None)
submitter(once=True, rses=[{'id': rse_id} for _, rse_id in dst_rses], partition_wait_time=None, transfertool='mock', transfertype='single', filter_transfertool=None)

for request in requests:
assert request_core.get_request(request_id=request['id'])['state'] == RequestState.SUBMITTED
Expand Down

0 comments on commit 56144b5

Please sign in to comment.