Skip to content
This repository has been archived by the owner on Jan 14, 2024. It is now read-only.

Commit

Permalink
#66: Improve coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
blackandred committed Aug 20, 2021
1 parent ee42d69 commit d40595f
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 23 deletions.
15 changes: 10 additions & 5 deletions src/core/rkd/core/api/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import os
import sys
from typing import Tuple, Dict, List
from typing import Tuple, Dict, List, Union
from unittest import TestCase
from io import StringIO
from copy import deepcopy
Expand Down Expand Up @@ -37,16 +37,20 @@ class OutputCapturingSafeTestCase(TestCase):

_stdout = None
_stderr = None
backup_stdout = True

def setUp(self) -> None:
os.environ['RKD_DEPTH'] = '0'
self._stdout = sys.stdout
self._stderr = sys.stderr

if self.backup_stdout:
self._stdout = sys.stdout
self._stderr = sys.stderr

super().setUp()

def tearDown(self) -> None:
self._restore_standard_out()
if self.backup_stdout:
self._restore_standard_out()

super().tearDown()

Expand Down Expand Up @@ -143,7 +147,8 @@ def satisfy_task_dependencies(task: TaskInterface, io: IO = None) -> TaskInterfa
return task

@staticmethod
def mock_execution_context(task: TaskInterface, args: Dict[str, str] = None, env: Dict[str, str] = None,
def mock_execution_context(task: TaskInterface, args: Dict[str, Union[str, bool]] = None,
env: Dict[str, str] = None,
defined_args: Dict[str, dict] = None) -> ExecutionContext:

"""
Expand Down
47 changes: 29 additions & 18 deletions src/core/rkd/core/standardlib/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,32 +84,43 @@ def add(self, src_path: str, target_path: str = None):
include_src_last_dir = not src_path.endswith('/')
src_path = os.path.abspath(src_path)

if os.path.isfile(src_path):
include_src_last_dir = False
src_dir = os.path.dirname(src_path)
self._add(src_dir, os.path.basename(src_path), target_path, src_path, include_src_last_dir)

for root, d_names, f_names in os.walk(src_path):
for f in f_names:
current_file_path = os.path.abspath(os.path.join(root, f))
self._add(root, f, target_path, src_path, include_src_last_dir)

def _add(self, root, f, target_path, src_path, include_src_last_dir):
current_file_path = os.path.abspath(os.path.join(root, f))

try:
if not self._can_be_added(current_file_path):
self.io().info(f'Ignoring "{current_file_path}"')
return

try:
if not self._can_be_added(current_file_path):
self.io().info(f'Ignoring "{current_file_path}"')
continue
except ValueError:
# ValueError: 'X' is not in the sub-path of 'Y'
pass

except ValueError:
# ValueError: 'X' is not in the subpath of 'Y'
pass
if not target_path:
current_file_target_path = current_file_path
else:
current_file_target_path = target_path

if not target_path:
current_file_target_path = current_file_path
else:
current_file_target_path = target_path + '/' + current_file_path[len(src_path) + 1:]
if "." not in os.path.basename(target_path):
current_file_target_path += '/' + current_file_path[len(src_path) + 1:]

# fix up paths like `.//vendor/composer/installed.json`
current_file_target_path = current_file_target_path.replace('.//', '')
# fix up paths like `.//vendor/composer/installed.json`
current_file_target_path = current_file_target_path.replace('.//', '')

# include last directory from source path if it did not end with "/"
if include_src_last_dir:
current_file_target_path = os.path.basename(src_path) + '/' + current_file_target_path
# include last directory from source path if it did not end with "/"
if include_src_last_dir:
current_file_target_path = os.path.basename(src_path) + '/' + current_file_target_path

self.sources[current_file_target_path] = current_file_path
self.sources[current_file_target_path] = current_file_path

def consider_gitignore(self, path: str = '.gitignore'):
"""
Expand Down
106 changes: 106 additions & 0 deletions src/core/tests/test_standardlib_io_archivepackagingtask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import os
import pytest
from rkd.core.api.inputoutput import BufferedSystemIO
from rkd.core.api.temp import TempManager
from rkd.core.api.testing import FunctionalTestingCase
from rkd.core.standardlib.io import ArchivePackagingBaseTask

TEST_PATH = os.path.realpath(__file__)


@pytest.mark.e2e
class ArchivePackagingTaskTest(FunctionalTestingCase):
backup_stdout = False

def test_dry_run_just_prints_messages(self):
"""
Check two things:
- Dry run mode should not produce zip file
- Messages should be printed
:return:
"""

io = BufferedSystemIO()
task = ArchivePackagingBaseTask()
self.satisfy_task_dependencies(task, io=io)

# configure
task.archive_path = '/tmp/something.zip'
task.archive_type = 'zip'
task.add('./tests/internal-samples')
task.add(TEST_PATH, 'test.py')

# execute
task.execute(self.mock_execution_context(
task,
{
"--dry-run": True,
"--allow-overwrite": False
},
{}
))

self.assertIn('test_standardlib_io_archivepackagingtask.py" -> "test.py"', io.get_value())
self.assertIn('subprojects/testsubproject1/.rkd/makefile.yaml" -> "', io.get_value())
self.assertFalse(os.path.isfile('/tmp/something.zip'))

def test_archive_overwrite_is_not_allowed_when_switch_not_used(self):
"""
Create archive TWICE without --allow-overwrite set
Expected behavior:
1st time) Just write the archive
2nd time) Raise an exception that the archive already exists
3rd time) Overwrite the archive, don't raise an error
:return:
"""

tasks = []
temp = TempManager()
archive_path = temp.create_tmp_file_path()[0] # same for two tasks

try:
for i in range(0, 3):
io = BufferedSystemIO()
task = ArchivePackagingBaseTask()
self.satisfy_task_dependencies(task, io=io)

task.archive_path = archive_path
task.archive_type = 'zip'
task.add(TEST_PATH, 'test.py')
tasks.append(task)

tasks[0].execute(self.mock_execution_context(
tasks[0],
{
"--dry-run": False,
"--allow-overwrite": False
},
{}
))

with self.assertRaises(FileExistsError) as exc:
tasks[1].execute(self.mock_execution_context(
tasks[1],
{
"--dry-run": False,
"--allow-overwrite": False
},
{}
))

tasks[2].execute(self.mock_execution_context(
tasks[2],
{
"--dry-run": False,
"--allow-overwrite": True
},
{}
))

self.assertIn('already exists, use --allow-overwrite to enforce recreation', str(exc.exception))
self.assertIn('-> "test.py"', tasks[2].io().get_value())

finally:
temp.finally_clean_up()

0 comments on commit d40595f

Please sign in to comment.