Skip to content

Commit

Permalink
Clients: re-download existing files if checksum is wrong. Fixes rucio…
Browse files Browse the repository at this point in the history
…#4323

Also update the "no-subdir" option documentation.

In the past, rucio was changing its behavior of overwriting existing
files depending on the 'no-subdir' option. If this option was set,
local files were always overwritten. If the option was not set, the
files were always let intact. This was counter-intuitive and was fixed
in a recent commit as being a bug. Leaving an incorrect documentation.

The discussion in the linked issue suggested that it will be the most
intuitive to check the checksum of existing files. If their checksum
is correct: leave them intact, but overwrite files in case of a
miss-match.
  • Loading branch information
rcarpa committed Feb 17, 2021
1 parent 92ca838 commit 7cfe619
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 10 deletions.
2 changes: 1 addition & 1 deletion bin/rucio
Expand Up @@ -2113,7 +2113,7 @@ You can filter by key/value, e.g.::
selected_parser.add_argument('--protocol', action='store', help='Force the protocol to use.')
selected_parser.add_argument('--nrandom', type=int, action='store', help='Download N random files from the DID.')
selected_parser.add_argument('--ndownloader', type=int, default=3, action='store', help='Choose the number of parallel processes for download.')
selected_parser.add_argument('--no-subdir', action='store_true', default=False, help="Don't create a subdirectory for the scope of the files. Existing files in the directory will be overwritten.")
selected_parser.add_argument('--no-subdir', action='store_true', default=False, help="Don't create a subdirectory for the scope of the files.")
selected_parser.add_argument('--pfn', dest='pfn', action='store', help="Specify the exact PFN for the download.")
selected_parser.add_argument('--archive-did', action='store', dest='archive_did', help="Download from archive is transparent. This option is obsolete.")
selected_parser.add_argument('--no-resolve-archives', action='store_true', default=False, help="If set archives will not be considered for download.")
Expand Down
14 changes: 10 additions & 4 deletions lib/rucio/client/downloadclient.py
Expand Up @@ -180,7 +180,7 @@ def download_pfns(self, items, num_threads=2, trace_custom_fields={}, traces_cop
did - DID string of this file (e.g. 'scope:file.name'). Wildcards are not allowed
rse - rse name (e.g. 'CERN-PROD_DATADISK'). RSE Expressions are not allowed
base_dir - Optional: Base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir and existing files are overwritten. (Default: False)
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
adler32 - Optional: The adler32 checmsum to compare the downloaded files adler32 checksum with
md5 - Optional: The md5 checksum to compare the downloaded files md5 checksum with
transfer_timeout - Optional: Timeout time for the download protocols. (Default: None)
Expand Down Expand Up @@ -252,7 +252,7 @@ def download_dids(self, items, num_threads=2, trace_custom_fields={}, traces_cop
resolve_archives - Deprecated: Use no_resolve_archives instead
force_scheme - Optional: force a specific scheme to download this item. (Default: None)
base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir and existing files are overwritten. (Default: False)
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
nrandom - Optional: if the DID addresses a dataset, nrandom files will be randomly choosen for download from the dataset
ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False)
transfer_timeout - Optional: Timeout time for the download protocols. (Default: None)
Expand Down Expand Up @@ -298,7 +298,7 @@ def download_from_metalink_file(self, item, metalink_file_path, num_threads=2, t
:param item: dictionary describing an item to download. Keys:
base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir and existing files are overwritten. (Default: False)
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False)
transfer_timeout - Optional: Timeout time for the download protocols. (Default: None)
:param num_threads: Suggestion of number of threads to use for the download. It will be lowered if it's too high.
Expand Down Expand Up @@ -460,6 +460,12 @@ def _download_item(self, item, trace, traces_copy_out, log_prefix=''):
# if file already exists make sure it exists at all destination paths, set state, send trace, and return
for dest_file_path in dest_file_paths:
if os.path.isfile(dest_file_path):
if not item.get('merged_options', {}).get('ignore_checksum', False):
verified, _, _ = _verify_checksum(item, dest_file_path)
if not verified:
logger(logging.INFO, '%sFile with same name exists locally, but checksum mismatches: %s' % (log_prefix, did_str))
continue

logger(logging.INFO, '%sFile exists already locally: %s' % (log_prefix, did_str))
for missing_file_path in dest_file_paths:
if not os.path.isfile(missing_file_path):
Expand Down Expand Up @@ -671,7 +677,7 @@ def download_aria2c(self, items, trace_custom_fields={}, filters={}):
did - DID string of this file (e.g. 'scope:file.name'). Wildcards are not allowed
rse - Optional: rse name (e.g. 'CERN-PROD_DATADISK') or rse expression from where to download
base_dir - Optional: base directory where the downloaded files will be stored. (Default: '.')
no_subdir - Optional: If true, files are written directly into base_dir and existing files are overwritten. (Default: False)
no_subdir - Optional: If true, files are written directly into base_dir. (Default: False)
nrandom - Optional: if the DID addresses a dataset, nrandom files will be randomly choosen for download from the dataset
ignore_checksum - Optional: If true, skips the checksum validation between the downloaded file and the rucio catalouge. (Default: False)
:param trace_custom_fields: Custom key value pairs to send with the traces
Expand Down
43 changes: 38 additions & 5 deletions lib/rucio/tests/test_download.py
Expand Up @@ -200,7 +200,7 @@ def test_download_multiple(self):
expected_result=[
{
'did': '%s:%s' % (scope, item100['did_name']),
'clientState': 'ALREADY_DONE', # TODO: fix #4323 and change this to 'DONE' if decided to overwrite
'clientState': 'ALREADY_DONE',
'dest_file_paths': ['%s/%s' % (tmp_dir, item100['did_name'])],
}
],
Expand All @@ -215,8 +215,10 @@ def test_download_from_archive_on_xrd(self):
# Create a zip archive with two files and upload it
name000 = base_name + '.000'
data000 = '000'
adler000 = '01230091'
name001 = base_name + '.001'
data001 = '001'
adler001 = '01240092'
zip_name = base_name + '.zip'
zip_path = '%s/%s' % (tmp_dir, zip_name)
with ZipFile(zip_path, 'w') as myzip:
Expand All @@ -227,13 +229,13 @@ def test_download_from_archive_on_xrd(self):
scope,
zip_name,
[
{'scope': scope, 'name': name000, 'bytes': len(data000), 'type': 'FILE', 'meta': {'guid': str(generate_uuid())}},
{'scope': scope, 'name': name001, 'bytes': len(data001), 'type': 'FILE', 'meta': {'guid': str(generate_uuid())}},
{'scope': scope, 'name': name000, 'bytes': len(data000), 'type': 'FILE', 'adler32': adler000, 'meta': {'guid': str(generate_uuid())}},
{'scope': scope, 'name': name001, 'bytes': len(data001), 'type': 'FILE', 'adler32': adler001, 'meta': {'guid': str(generate_uuid())}},
],
)

# Download one file from the archive
result = self.download_client.download_dids([{'did': '%s:%s' % (scope, name000), 'base_dir': tmp_dir, 'ignore_checksum': True}])
result = self.download_client.download_dids([{'did': '%s:%s' % (scope, name000), 'base_dir': tmp_dir}])
self._check_download_result(
actual_result=result,
expected_result=[
Expand All @@ -247,7 +249,7 @@ def test_download_from_archive_on_xrd(self):
assert file.read() == data000

# Download both files from the archive
result = self.download_client.download_dids([{'did': '%s:%s.00*' % (scope, base_name), 'base_dir': tmp_dir, 'ignore_checksum': True}])
result = self.download_client.download_dids([{'did': '%s:%s.00*' % (scope, base_name), 'base_dir': tmp_dir}])
self._check_download_result(
actual_result=result,
expected_result=[
Expand All @@ -264,6 +266,19 @@ def test_download_from_archive_on_xrd(self):
with open('%s/%s/%s' % (tmp_dir, scope, name001), 'r') as file:
assert file.read() == data001

pfn = next(filter(lambda r: name001 in r['did'], result))['sources'][0]['pfn']
# Download by pfn from the archive
result = self.download_client.download_pfns([{'did': '%s:%s' % (scope, name001), 'pfn': pfn, 'rse': rse, 'base_dir': tmp_dir, 'no_subdir': True}])
self._check_download_result(
actual_result=result,
expected_result=[
{
'did': '%s:%s' % (scope, name001),
'clientState': 'DONE',
},
],
)

def test_trace_copy_out_and_checksum_validation(self):
rse = 'MOCK4'
scope = 'mock'
Expand All @@ -287,6 +302,13 @@ def test_trace_copy_out_and_checksum_validation(self):
result = self.download_client.download_dids([{'did': '%s:%s' % (scope, name), 'base_dir': tmp_dir}], traces_copy_out=traces)
assert len(traces) == 1 and traces[0]['clientState'] == 'ALREADY_DONE'

# Change the local file and download the same file again. Checksum validation should fail and it must be re-downloaded
with open(result[0]['dest_file_paths'][0], 'a') as f:
f.write("more data")
traces = []
result = self.download_client.download_dids([{'did': '%s:%s' % (scope, name), 'base_dir': tmp_dir}], traces_copy_out=traces)
assert len(traces) == 1 and traces[0]['clientState'] == 'DONE'

pfn = result[0]['sources'][0]['pfn']

# Switch to a new empty directory
Expand All @@ -302,6 +324,17 @@ def test_trace_copy_out_and_checksum_validation(self):
self.download_client.download_pfns([{'did': '%s:%s' % (scope, name), 'pfn': pfn, 'rse': rse, 'base_dir': tmp_dir}], traces_copy_out=traces)
assert len(traces) == 1 and traces[0]['clientState'] == 'DONE'

# Same pfn. Local file already present. Shouldn't be overwritten.
traces = []
self.download_client.download_pfns([{'did': '%s:%s' % (scope, name), 'pfn': pfn, 'rse': rse, 'base_dir': tmp_dir}], traces_copy_out=traces)
assert len(traces) == 1 and traces[0]['clientState'] == 'ALREADY_DONE'

# Provide wrong checksum for validation, the file will be re-downloaded but checksum validation fails
traces = []
with pytest.raises(NoFilesDownloaded):
self.download_client.download_pfns([{'did': '%s:%s' % (scope, name), 'pfn': pfn, 'rse': rse, 'adler32': 'wrong', 'base_dir': tmp_dir}], traces_copy_out=traces)
assert len(traces) == 1 and traces[0]['clientState'] == 'FAIL_VALIDATE'

# Switch to a new empty directory
with TemporaryDirectory() as tmp_dir:
# Simulate checksum corruption by changing the source file. We rely on the particularity
Expand Down

0 comments on commit 7cfe619

Please sign in to comment.