Skip to content

Commit

Permalink
Merge pull request #1399 from cserf/patch-1394-Add_a_activity__dest_r…
Browse files Browse the repository at this point in the history
…se_policy_in_bulk_group_transfer

Conveyor : Add a activity, dest_rse policy in bulk_group_transfer : Closes #1394
  • Loading branch information
bari12 committed Aug 6, 2018
2 parents 3d15994 + 66a255e commit c59f381
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions lib/rucio/daemons/conveyor/common.py
Expand Up @@ -78,24 +78,24 @@ def submit_transfer(external_host, job, submitter='submitter', process=0, thread
logging.debug("%s:%s start to prepare transfer" % (process, thread))
transfer_core.prepare_sources_for_transfers(xfers_ret)
logging.debug("%s:%s finished to prepare transfer" % (process, thread))
except:
except Exception:
logging.error("%s:%s Failed to prepare requests %s state to SUBMITTING(Will not submit jobs but return directly) with error: %s" % (process, thread, list(xfers_ret.keys()), traceback.format_exc()))
return

# submit the job
eid = None
try:
ts = time.time()
start_time = time.time()
logging.info("%s:%s About to submit job to %s with timeout %s" % (process, thread, external_host, timeout))
eid = transfer_core.submit_bulk_transfers(external_host,
files=job['files'],
transfertool='fts3',
job_params=job['job_params'],
timeout=timeout,
user_transfer_job=user_transfer_job)
duration = time.time() - ts
duration = time.time() - start_time
logging.info("%s:%s Submit job %s to %s in %s seconds" % (process, thread, eid, external_host, duration))
record_timer('daemons.conveyor.%s.submit_bulk_transfer.per_file' % submitter, (time.time() - ts) * 1000 / len(job['files']))
record_timer('daemons.conveyor.%s.submit_bulk_transfer.per_file' % submitter, (time.time() - start_time) * 1000 / len(job['files']))
record_counter('daemons.conveyor.%s.submit_bulk_transfer' % submitter, len(job['files']))
record_timer('daemons.conveyor.%s.submit_bulk_transfer.files' % submitter, len(job['files']))
except Exception as error:
Expand Down Expand Up @@ -137,13 +137,13 @@ def submit_transfer(external_host, job, submitter='submitter', process=0, thread
logging.debug("%s:%s start to register transfer state" % (process, thread))
transfer_core.set_transfers_state(xfers_ret, datetime.datetime.utcnow())
logging.debug("%s:%s finished to register transfer state" % (process, thread))
except:
except Exception:
logging.error("%s:%s Failed to register transfer state with error: %s" % (process, thread, traceback.format_exc()))
try:
if eid:
logging.info("%s:%s Cancel transfer %s on %s" % (process, thread, eid, external_host))
request.cancel_request_external_id(eid, external_host)
except:
except Exception:
logging.error("%s:%s Failed to cancel transfers %s on %s with error: %s" % (process, thread, eid, external_host, traceback.format_exc()))


Expand Down Expand Up @@ -246,6 +246,9 @@ def bulk_group_transfer(transfers, policy='rule', group_bulk=200, fts_source_str
policy_key = '%s,%s' % (file['metadata']['src_rse'], file['metadata']['dst_rse'])
if policy == 'rule_src_dest':
policy_key = '%s,%s,%s' % (transfer['rule_id'], file['metadata']['src_rse'], file['metadata']['dst_rse'])
if policy == 'activity_dest':
policy_key = '%s %s' % (activity, file['metadata']['dst_rse'])
policy_key = "_".join(policy_key.split(' '))
# maybe here we need to hash the key if it's too long

if USER_TRANSFERS not in ['cms'] or activity not in USER_ACTIVITY:
Expand Down

0 comments on commit c59f381

Please sign in to comment.