Skip to content

Commit

Permalink
Clients: Validate already downloaded file by filesize only Fix rucio#…
Browse files Browse the repository at this point in the history
…5008

The checksum verification is slow compared to the filesize verification. This
commits adds the option to verify already downloded files by filesize only.
  • Loading branch information
Joel Dierkes committed Nov 26, 2021
1 parent 5d428e0 commit de52c72
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 34 deletions.
7 changes: 7 additions & 0 deletions bin/rucio
Expand Up @@ -53,6 +53,7 @@
# - KosKyr <90753277+KosKyr@users.noreply.github.com>, 2021
# - Joel Dierkes <joel.dierkes@cern.ch>, 2021
# - jdierkes <joel.dierkes@cern.ch>, 2021
# - Igor Mandrichenko <ivm@fnal.gov>, 2021
# - Rob Barnsley <robbarnsley@users.noreply.github.com>, 2021

from __future__ import print_function
Expand Down Expand Up @@ -1087,6 +1088,10 @@ def download(args):
logger.error('Arguments dids and metalink cannot be used together.')
return FAILURE

if args.ignore_checksum and args.check_local_with_filesize_only:
logger.error('Arguments ignore-checksum and check-local-with-filesize-only cannot be used together.')
return FAILURE

if args.rse: # TODO:remove-deprecated
args.rses = args.rse
option_deprecation_message("--rse", "--rses")
Expand Down Expand Up @@ -1122,6 +1127,7 @@ def download(args):
item_defaults['transfer_speed_timeout'] = args.transfer_speed_timeout
item_defaults['no_resolve_archives'] = args.no_resolve_archives
item_defaults['ignore_checksum'] = args.ignore_checksum
item_defaults['check_local_with_filesize_only'] = args.check_local_with_filesize_only
archive_did = args.archive_did
if archive_did:
logger.warning("Archives are treated transparently. --archive-did option is being obsoleted.") # TODO
Expand Down Expand Up @@ -2317,6 +2323,7 @@ You can filter by key/value, e.g.::
selected_parser.add_argument('--archive-did', action='store', dest='archive_did', help="Download from archive is transparent. This option is obsolete.")
selected_parser.add_argument('--no-resolve-archives', action='store_true', default=False, help="If set archives will not be considered for download.")
selected_parser.add_argument('--ignore-checksum', action='store_true', default=False, help="Don't validate checksum for downloaded files.")
selected_parser.add_argument('--check-local-with-filesize-only', action='store_true', default=False, help="Don't use checksum verification for already downloaded files, use filesize instead.")
selected_parser.add_argument('--transfer-timeout', dest='transfer_timeout', type=float, action='store', default=config_get('download', 'transfer_timeout', False, None), help='Transfer timeout (in seconds). Default: computed dynamically from --transfer-speed-timeout. If set to any value >= 0, --transfer-speed-timeout is ignored.') # NOQA: E501
selected_parser.add_argument('--transfer-speed-timeout', dest='transfer_speed_timeout', type=float, action='store', default=config_get('download', 'transfer_speed_timeout', False, 500), help='Minimum allowed average transfer speed (in KBps). Default: 500. Used to dynamically compute the timeout if --transfer-timeout not set. Is not supported for --pfn.') # NOQA: E501
selected_parser.add_argument('--aria', action='store_true', default=False, help="Use aria2c utility if possible. (EXPERIMENTAL)")
Expand Down
80 changes: 46 additions & 34 deletions lib/rucio/client/downloadclient.py
Expand Up @@ -192,14 +192,15 @@ def download_pfns(self, items, num_threads=2, trace_custom_fields={}, traces_cop
Download items with a given PFN. This function can only download files, no datasets.
:param items: List of dictionaries. Each dictionary describing a file to download. Keys:
pfn - PFN string of this file
did - DID string of this file (e.g. 'scope:file.name'). Wildcards are not allowed
rse - rse name (e.g. 'CERN-PROD_DATADISK'). RSE Expressions are not allowed
base_dir - Optional: Base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
adler32 - Optional: The adler32 checmsum to compare the downloaded files adler32 checksum with
md5 - Optional: The md5 checksum to compare the downloaded files md5 checksum with
transfer_timeout - Optional: Timeout time for the download protocols. (Default: None)
pfn - PFN string of this file
did - DID string of this file (e.g. 'scope:file.name'). Wildcards are not allowed
rse - rse name (e.g. 'CERN-PROD_DATADISK'). RSE Expressions are not allowed
base_dir - Optional: Base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
adler32 - Optional: The adler32 checmsum to compare the downloaded files adler32 checksum with
md5 - Optional: The md5 checksum to compare the downloaded files md5 checksum with
transfer_timeout - Optional: Timeout time for the download protocols. (Default: None)
check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum.
:param num_threads: Suggestion of number of threads to use for the download. It will be lowered if it's too high.
:param trace_custom_fields: Custom key value pairs to send with the traces
:param traces_copy_out: reference to an external list, where the traces should be uploaded
Expand Down Expand Up @@ -266,19 +267,20 @@ def download_dids(self, items, num_threads=2, trace_custom_fields={}, traces_cop
Download items with given DIDs. This function can also download datasets and wildcarded DIDs.
:param items: List of dictionaries. Each dictionary describing an item to download. Keys:
did - DID string of this file (e.g. 'scope:file.name')
filters - Filter to select DIDs for download. Optional if DID is given
rse - Optional: rse name (e.g. 'CERN-PROD_DATADISK') or rse expression from where to download
impl - Optional: name of the protocol implementation to be used to download this item.
no_resolve_archives - Optional: bool indicating whether archives should not be considered for download (Default: False)
resolve_archives - Deprecated: Use no_resolve_archives instead
force_scheme - Optional: force a specific scheme to download this item. (Default: None)
base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
nrandom - Optional: if the DID addresses a dataset, nrandom files will be randomly choosen for download from the dataset
ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False)
transfer_timeout - Optional: Timeout time for the download protocols. (Default: None)
transfer_speed_timeout - Optional: Minimum allowed transfer speed (in KBps). Ignored if transfer_timeout set. Otherwise, used to compute default timeout (Default: 500)
did - DID string of this file (e.g. 'scope:file.name')
filters - Filter to select DIDs for download. Optional if DID is given
rse - Optional: rse name (e.g. 'CERN-PROD_DATADISK') or rse expression from where to download
impl - Optional: name of the protocol implementation to be used to download this item.
no_resolve_archives - Optional: bool indicating whether archives should not be considered for download (Default: False)
resolve_archives - Deprecated: Use no_resolve_archives instead
force_scheme - Optional: force a specific scheme to download this item. (Default: None)
base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
nrandom - Optional: if the DID addresses a dataset, nrandom files will be randomly choosen for download from the dataset
ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False)
transfer_timeout - Optional: Timeout time for the download protocols. (Default: None)
transfer_speed_timeout - Optional: Minimum allowed transfer speed (in KBps). Ignored if transfer_timeout set. Otherwise, used to compute default timeout (Default: 500)
check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum.
:param num_threads: Suggestion of number of threads to use for the download. It will be lowered if it's too high.
:param trace_custom_fields: Custom key value pairs to send with the traces.
:param traces_copy_out: reference to an external list, where the traces should be uploaded
Expand Down Expand Up @@ -314,10 +316,12 @@ def download_from_metalink_file(self, item, metalink_file_path, num_threads=2, t
Download items using a given metalink file.
:param item: dictionary describing an item to download. Keys:
base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False)
transfer_timeout - Optional: Timeout time for the download protocols. (Default: None)
base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False)
transfer_timeout - Optional: Timeout time for the download protocols. (Default: None)
check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum.
:param num_threads: Suggestion of number of threads to use for the download. It will be lowered if it's too high.
:param trace_custom_fields: Custom key value pairs to send with the traces.
:param traces_copy_out: reference to an external list, where the traces should be uploaded
Expand Down Expand Up @@ -508,7 +512,13 @@ def _download_item(self, item, trace, traces_copy_out, log_prefix=''):
# if file already exists make sure it exists at all destination paths, set state, send trace, and return
for dest_file_path in dest_file_paths:
if os.path.isfile(dest_file_path):
if not item.get('merged_options', {}).get('ignore_checksum', False):
if item.get('merged_options', {}).get('check_local_with_filesize_only', False):
local_filesize = os.stat(dest_file_path).st_size
if item.get('bytes') != local_filesize:
logger(logging.INFO, '%sFile with same name exists locally, but filesize mismatches: %s' % (log_prefix, did_str))
logger(logging.DEBUG, '%slocal filesize: %d bytes, expected filesize: %d bytes' % (log_prefix, local_filesize, item.get('bytes')))
continue
elif not item.get('merged_options', {}).get('ignore_checksum', False):
verified, _, _ = _verify_checksum(item, dest_file_path)
if not verified:
logger(logging.INFO, '%sFile with same name exists locally, but checksum mismatches: %s' % (log_prefix, did_str))
Expand Down Expand Up @@ -732,12 +742,14 @@ def download_aria2c(self, items, trace_custom_fields={}, filters={}, deactivate_
Aria2c needs to be installed and X509_USER_PROXY needs to be set!
:param items: List of dictionaries. Each dictionary describing an item to download. Keys:
did - DID string of this file (e.g. 'scope:file.name'). Wildcards are not allowed
rse - Optional: rse name (e.g. 'CERN-PROD_DATADISK') or rse expression from where to download
base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
nrandom - Optional: if the DID addresses a dataset, nrandom files will be randomly choosen for download from the dataset
ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False)
did - DID string of this file (e.g. 'scope:file.name'). Wildcards are not allowed
rse - Optional: rse name (e.g. 'CERN-PROD_DATADISK') or rse expression from where to download
base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
nrandom - Optional: if the DID addresses a dataset, nrandom files will be randomly choosen for download from the dataset
ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False)
check_local_with_filesize_only - Optional: If true, already downloaded files will not be validated by checksum.
:param trace_custom_fields: Custom key value pairs to send with the traces
:param filters: dictionary containing filter options
:param deactivate_file_download_exceptions: Boolean, if file download exceptions shouldn't be raised
Expand Down Expand Up @@ -1227,15 +1239,15 @@ def _options_from_input_items(self, input_items):
for item in input_items:
base_dir = item.get('base_dir', '.')
no_subdir = item.get('no_subdir', False)
ignore_checksum = item.get('ignore_checksum', False)
new_transfer_timeout = item.get('transfer_timeout', None)
new_transfer_speed_timeout = item.get('transfer_speed_timeout', None)

options.setdefault('destinations', set()).add((base_dir, no_subdir))

# Merge some options
# The other options of this DID will be inherited from the first item that contained the DID
options['ignore_checksum'] = (options.get('ignore_checksum') or ignore_checksum)
options['ignore_checksum'] = options.get('ignore_checksum') or item.get('ignore_checksum', False)
options['check_local_with_filesize_only'] = options.get('check_local_with_filesize_only') or item.get('check_local_with_filesize_only', False)

# if one item wants to resolve archives we enable it for all items
options['resolve_archives'] = (options.get('resolve_archives') or not item.get('no_resolve_archives'))
Expand Down
27 changes: 27 additions & 0 deletions lib/rucio/tests/test_bin_rucio.py
Expand Up @@ -1007,6 +1007,33 @@ def test_download_dataset(self):
search = '{0} successfully downloaded'.format(tmp_file1[5:]) # triming '/tmp/' from filename
assert re.search(search, err) is not None

def test_download_file_check_by_size(self):
"""CLIENT(USER): Rucio download files"""
tmp_file1 = file_generator()
# add files
cmd = 'rucio upload --rse {0} --scope {1} {2}'.format(self.def_rse, self.user, tmp_file1)
print(self.marker + cmd)
exitcode, out, err = execute(cmd)
print(out, err)
# download files
cmd = 'rucio -v download --dir /tmp {0}:{1}'.format(self.user, tmp_file1[5:]) # triming '/tmp/' from filename
print(self.marker + cmd)
exitcode, out, err = execute(cmd)
print(out, err)
# Alter downloaded file
cmd = 'echo "dummy" >> /tmp/{}/{}'.format(self.user, tmp_file1[5:]) # triming '/tmp/' from filename
print(self.marker + cmd)
exitcode, out, err = execute(cmd)
print(out, err)
assert exitcode == 0
# Download file again and check for mismatch
cmd = 'rucio -v download --check-local-with-filesize-only --dir /tmp {0}:{1}'.format(self.user, tmp_file1[5:]) # triming '/tmp/' from filename
print(self.marker + cmd)
exitcode, out, err = execute(cmd)
print(out, err)
assert exitcode == 0
assert "File with same name exists locally, but filesize mismatches" in err

def test_list_blocklisted_replicas(self):
"""CLIENT(USER): Rucio list replicas"""
# add rse
Expand Down

0 comments on commit de52c72

Please sign in to comment.