Skip to content

Commit

Permalink
Fix timeout option in gfal download + Introduce tests for gfal implem…
Browse files Browse the repository at this point in the history
…entation : Closes #6134
  • Loading branch information
cserf authored and bari12 committed Mar 17, 2023
1 parent adf7108 commit 38f5e99
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/integration_tests.yml
Expand Up @@ -131,6 +131,8 @@ jobs:
run: docker exec -t dev_rucio_1 tools/pytest.sh -v --tb=short test_upload.py
- name: File Upload/Download Test using 'impl' parameter
run: docker exec -t dev_rucio_1 tools/pytest.sh -v --tb=short test_impl_upload_download.py
- name: Test gfal2 implementation on xrootd protocol
run: docker exec -t dev_rucio_1 tools/pytest.sh -v --tb=short test_rse_protocol_gfal2_impl.py
- name: Test Protocol XrootD
run: docker exec -t dev_rucio_1 tools/pytest.sh -v --tb=short test_rse_protocol_xrootd.py
- name: Test Protocol SSH (scp)
Expand Down
6 changes: 3 additions & 3 deletions lib/rucio/rse/protocols/gfal.py
Expand Up @@ -437,9 +437,9 @@ def __gfal2_copy(self, src, dest, src_spacetoken=None, dest_spacetoken=None, tra
"""
ctx = self.__ctx
if transfer_timeout:
ctx.set_opt_integer("HTTP PLUGIN", "OPERATION_TIMEOUT", transfer_timeout)
ctx.set_opt_integer("SRM PLUGIN", "OPERATION_TIMEOUT", transfer_timeout)
ctx.set_opt_integer("GRIDFTP PLUGIN", "OPERATION_TIMEOUT", transfer_timeout)
ctx.set_opt_integer("HTTP PLUGIN", "OPERATION_TIMEOUT", int(transfer_timeout))
ctx.set_opt_integer("SRM PLUGIN", "OPERATION_TIMEOUT", int(transfer_timeout))
ctx.set_opt_integer("GRIDFTP PLUGIN", "OPERATION_TIMEOUT", int(transfer_timeout))
watchdog = Timer(int(transfer_timeout) + 60, self.__gfal2_cancel)
params = ctx.transfer_parameters()
if src_spacetoken:
Expand Down
27 changes: 27 additions & 0 deletions lib/rucio/tests/rsemgr_api_test.py
Expand Up @@ -16,6 +16,7 @@
import itertools
import os
import shutil
import tempfile
import os.path
from uuid import uuid4 as uuid

Expand Down Expand Up @@ -335,3 +336,29 @@ def test_change_scope_mgr_ok_single_pfn(self):
pfn = list(mgr.lfns2pfns(self.rse_settings, {'name': '2_rse_remote_change_scope.raw', 'scope': 'user.%s' % self.user}, impl=self.impl).values())[0]
pfn_new = list(mgr.lfns2pfns(self.rse_settings, {'name': '2_rse_remote_change_scope.raw', 'scope': 'group.%s' % self.user}, impl=self.impl).values())[0]
mgr.rename(self.rse_settings, {'name': pfn, 'new_name': pfn_new}, impl=self.impl)

def test_download_protocol_ok_single_pfn(self):
"""(RSE/PROTOCOLS): Check a single file download using PFN (Success)"""
filename = '1_rse_remote_get.raw'
pfn = list(mgr.lfns2pfns(self.rse_settings, {'name': filename, 'scope': 'user.%s' % self.user}, impl=self.impl).values())[0]
protocol = mgr.create_protocol(self.rse_settings, 'write', impl=self.impl)
protocol.connect()
with tempfile.TemporaryDirectory() as tmpdirname:
protocol.get(pfn, dest='%s/%s' % (tmpdirname, filename), transfer_timeout=None)
assert filename in os.listdir(tmpdirname)
assert os.path.isfile('%s/%s' % (tmpdirname, filename))
size = os.stat('%s/%s' % (tmpdirname, filename)).st_size
assert size == 1048576

def test_download_protocol_ok_single_pfn_timeout(self):
"""(RSE/PROTOCOLS): Check a single file download using PFN and timeout parameter (Success)"""
filename = '1_rse_remote_get.raw'
pfn = list(mgr.lfns2pfns(self.rse_settings, {'name': filename, 'scope': 'user.%s' % self.user}, impl=self.impl).values())[0]
protocol = mgr.create_protocol(self.rse_settings, 'write', impl=self.impl)
protocol.connect()
with tempfile.TemporaryDirectory() as tmpdirname:
protocol.get(pfn, dest='%s/%s' % (tmpdirname, filename), transfer_timeout='10')
assert filename in os.listdir(tmpdirname)
assert os.path.isfile('%s/%s' % (tmpdirname, filename))
size = os.stat('%s/%s' % (tmpdirname, filename)).st_size
assert size == 1048576
99 changes: 99 additions & 0 deletions lib/rucio/tests/test_rse_protocol_gfal2_impl.py
@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import pytest

from rucio.common.utils import execute
from rucio.rse import rsemanager
from rucio.tests.common import skip_rse_tests_with_accounts, load_test_conf_file
from rucio.tests.rsemgr_api_test import MgrTestCases


@pytest.mark.noparallel(reason='creates and removes a test directory with a fixed name')
@skip_rse_tests_with_accounts
class TestRseGFAL2Impl(MgrTestCases):

@classmethod
@pytest.fixture(scope='class')
def setup_rse_and_files(cls, vo, tmp_path_factory):
"""GFAL2 (RSE/PROTOCOLS): Creating necessary directories and files """

cmd = "rucio list-rses --rses 'test_container_xrd=True'"
exitcode, out, err = execute(cmd)
rses = out.split()

data = load_test_conf_file('rse_repository.json')
prefix = data['WJ-XROOTD']['protocols']['supported']['xroot']['prefix']

if len(rses) == 0:
rse_name = 'WJ-XROOTD'
hostname = data['WJ-XROOTD']['protocols']['supported']['xroot']['hostname']
else:
rse_name = 'XRD1'
hostname = 'xrd1'
prefix = '//rucio/'

try:
os.mkdir(prefix)
except Exception as e:
print(e)

rse_settings, tmpdir, user = cls.setup_common_test_env(rse_name, vo, tmp_path_factory)
rse_settings['protocols'][0]['impl'] = 'rucio.rse.protocols.gfal.Default'

protocol = rsemanager.create_protocol(rse_settings, 'write')
protocol.connect()

os.system('dd if=/dev/urandom of=%s/data.raw bs=1024 count=1024' % prefix)

for f in cls.files_remote:
path = protocol.path2pfn(prefix + protocol._get_path('user.%s' % user, f))
cmd = 'xrdcp %s/data.raw %s' % (prefix, path)
execute(cmd)

for f in MgrTestCases.files_local_and_remote:
path = protocol.path2pfn(prefix + protocol._get_path('user.%s' % user, f))
cmd = 'xrdcp %s/%s %s' % (tmpdir, f, path)
execute(cmd)

yield rse_settings, tmpdir, user

clean_raw = '%s/data.raw' % prefix
list_files_cmd_user = 'xrdfs %s ls %s/user.%s' % (hostname, prefix, user)
clean_files = str(execute(list_files_cmd_user)[1]).split('\n')
list_files_cmd_group = 'xrdfs %s ls %s/group.%s' % (hostname, prefix, user)
clean_files += str(execute(list_files_cmd_group)[1]).split('\n')
clean_files.append(clean_raw)
for files in clean_files:
clean_cmd = 'xrdfs %s rm %s' % (hostname, files)
execute(clean_cmd)

clean_prefix = '%s' % prefix
list_directory = 'xrdfs %s ls %s' % (hostname, prefix)
clean_directory = str(execute(list_directory)[1]).split('\n')
clean_directory.append(clean_prefix)
for directory in clean_directory:
clean_cmd = 'xrdfs %s rmdir %s' % (hostname, directory)
execute(clean_cmd)

@pytest.fixture(autouse=True)
def setup_obj(self, setup_rse_and_files, vo):
rse_settings, tmpdir, user = setup_rse_and_files
self.init(tmpdir=tmpdir, rse_settings=rse_settings, user=user, vo=vo)

def test_delete_mgr_ok_dir(self):
raise pytest.skip("Not implemented")

0 comments on commit 38f5e99

Please sign in to comment.