Skip to content

Commit

Permalink
Adding test case for parllel gcs copy script.
Browse files Browse the repository at this point in the history
  • Loading branch information
AK committed Jul 30, 2018
1 parent cb4a36e commit 921e4ed
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
1 change: 1 addition & 0 deletions bqsqoop/utils/gcloud/storage.py
Expand Up @@ -40,6 +40,7 @@ def parallel_copy_files_to_gcs(files, gcs_bucket_path, project_id,
Same as copy_files_to_gcs, except this method does a parallel upload
one file per process.
"""
_validate_gcs_path(gcs_bucket_path)
_async_worker = async_worker.AsyncWorker(len(files))
if use_new_tmp_folder:
# Add tmp folder outside and make sure all files are in the same path
Expand Down
59 changes: 58 additions & 1 deletion tests/utils/gcloud/test_storage.py
Expand Up @@ -4,7 +4,7 @@
from unittest.mock import patch, MagicMock, call
from bqsqoop.utils.gcloud.storage import (
copy_files_to_gcs, _get_details_from_gcs_path, delete_files_in,
download_file_as_string
download_file_as_string, parallel_copy_files_to_gcs
)


Expand Down Expand Up @@ -85,6 +85,63 @@ def test_file_uploads(self, storage_client, mock_uuid):
call_args, [call(filename='file1'), call(filename='file2')])


class TestParallelCopyFiles(unittest.TestCase):
@patch('bqsqoop.utils.async_worker.AsyncWorker')
def test_valid_gcs_path(self, async_worker):
files = ["file1", "file2"]
gcs_project = "gcs_project_1"
valid_path = "gs://gcs_bucket"
parallel_copy_files_to_gcs(files, valid_path, gcs_project)

@patch('bqsqoop.utils.async_worker.AsyncWorker')
def test_invalid_gcs_path(self, async_worker):
files = ["file1", "file2"]
gcs_project = "gcs_project_1"
invalid_path = "gcs_bucket"
with pytest.raises(
Exception, match=r'Not a valid GCS tmp path.'):
parallel_copy_files_to_gcs(files, invalid_path, gcs_project)
invalid_path = "gs://"
with pytest.raises(
Exception, match=r'Not a valid GCS tmp path.'):
parallel_copy_files_to_gcs(files, invalid_path, gcs_project)

@patch('uuid.uuid4', return_value="F43C2651-18C8-4EB0-82D2-10E3C7226015")
@patch('bqsqoop.utils.async_worker.AsyncWorker')
def test_file_uploads(self, async_worker, mock_uuid):
gcs_bucket_path = "gs://gcs_bucket/tmp_path/"
files = ["file1", "file2"]
mock_worker = MagicMock()
async_worker.return_value = mock_worker
mock_worker.send_data_to_worker = MagicMock()

gcs_path = parallel_copy_files_to_gcs(
files, gcs_bucket_path, "gcs_project_1", True)
async_worker.assert_called_with(2)
self.assertEqual(
gcs_path,
"gs://gcs_bucket/tmp_path/F43C2651-18C8-4EB0-82D2-10E3C7226015/")
call_args = mock_worker.send_data_to_worker.call_args_list
_, kwargs = call_args[0]
self.assertEqual(kwargs['files'], ['file1'])
self.assertEqual(
kwargs['gcs_bucket_path'],
"gs://gcs_bucket/tmp_path/F43C2651-18C8-4EB0-82D2-10E3C7226015/")
self.assertEqual(kwargs['project_id'], 'gcs_project_1')
self.assertEqual(kwargs['use_new_tmp_folder'], False)
self.assertEqual(
kwargs['worker_callback'].__name__, "copy_files_to_gcs")
_, kwargs = call_args[1]
self.assertEqual(kwargs['files'], ['file2'])
self.assertEqual(
kwargs['gcs_bucket_path'],
"gs://gcs_bucket/tmp_path/F43C2651-18C8-4EB0-82D2-10E3C7226015/")
self.assertEqual(kwargs['project_id'], 'gcs_project_1')
self.assertEqual(kwargs['use_new_tmp_folder'], False)
self.assertEqual(
kwargs['worker_callback'].__name__, "copy_files_to_gcs")


class TestDeleteFilesIn(unittest.TestCase):
@patch('google.cloud.storage.Client')
def test_valid_gcs_path(self, storage_client):
Expand Down

0 comments on commit 921e4ed

Please sign in to comment.