diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e07b791..dae88d6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -58,5 +58,5 @@ jobs: BUS_URL: amqp://guest:guest@${{ github.server_url != 'https://github.com' && 'rabbitmq' || 'localhost' }} run: | pip install coverage - coverage run -m unittest tests/**/*.py + coverage run -m unittest tests/*.py coverage report -m --fail-under=60 \ No newline at end of file diff --git a/servc/svc/com/storage/lake.py b/servc/svc/com/storage/lake.py index 9058380..254f37b 100644 --- a/servc/svc/com/storage/lake.py +++ b/servc/svc/com/storage/lake.py @@ -45,7 +45,7 @@ def _get_table_name(self) -> str: name_w_medallion = self._table else: name_w_medallion = "".join( - [self._table["medallion"].value, "-", self._table["name"]] + [self._table["medallion"].value, "_", self._table["name"]] ) return ".".join([schema, name_w_medallion]) diff --git a/servc/svc/com/worker/hooks/parallelize.py b/servc/svc/com/worker/hooks/parallelize.py index 9845463..20f3195 100644 --- a/servc/svc/com/worker/hooks/parallelize.py +++ b/servc/svc/com/worker/hooks/parallelize.py @@ -57,8 +57,7 @@ def evaluate_part_pre_hook( jobs = resolvers[part_method](message["id"], artifact, context) if not isinstance(jobs, list): - print(f"Resolver {part_method} did not return a list") - return True + raise Exception(f"Resolver {part_method} did not return a list") # formulate on complete hook complete_hook: List[OnCompleteHook] = [] @@ -84,8 +83,9 @@ def evaluate_part_pre_hook( complete_hook.append(newHook) # create task queue - task_queue = f"part.{route}-{method}-{message['id']}" - bus.create_queue(task_queue, False) + if len(jobs): + task_queue = f"part.{route}-{method}-{message['id']}" + bus.create_queue(task_queue, False) # publish messages to part queue payload_template: InputPayload = { diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..6ea93ed 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,23 @@ +import json + + +def get_route_message(channel, cache, route, deleteRoute=False): + queue = channel.queue_declare( + queue=route, + passive=True, + durable=True, + exclusive=False, + auto_delete=False, + ) + count = queue.method.message_count + body = None + + if count: + _m, _h, body = channel.basic_get(route) + if deleteRoute: + channel.queue_delete(queue=route) + if body: + body = json.loads(body.decode("utf-8")) + if "argumentId" in body: + body["argument"] = cache.getKey(body["argumentId"]) + return body, count diff --git a/tests/hooks/__init__.py b/tests/hooks/__init__.py deleted file mode 100644 index 6ea93ed..0000000 --- a/tests/hooks/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -import json - - -def get_route_message(channel, cache, route, deleteRoute=False): - queue = channel.queue_declare( - queue=route, - passive=True, - durable=True, - exclusive=False, - auto_delete=False, - ) - count = queue.method.message_count - body = None - - if count: - _m, _h, body = channel.basic_get(route) - if deleteRoute: - channel.queue_delete(queue=route) - if body: - body = json.loads(body.decode("utf-8")) - if "argumentId" in body: - body["argument"] = cache.getKey(body["argumentId"]) - return body, count diff --git a/tests/hooks/test_complete.py b/tests/test_complete.py similarity index 98% rename from tests/hooks/test_complete.py rename to tests/test_complete.py index 23b2bca..5a3ba7a 100644 --- a/tests/hooks/test_complete.py +++ b/tests/test_complete.py @@ -8,7 +8,7 @@ from servc.svc.config import Config from servc.svc.io.hooks import CompleteHookType from servc.svc.io.input import ArgumentArtifact, InputPayload, InputType -from tests.hooks import get_route_message +from tests import get_route_message message: InputPayload = { "id": "123", diff --git a/tests/svc/test_config.py b/tests/test_config.py similarity index 100% rename from tests/svc/test_config.py rename to tests/test_config.py diff --git a/tests/lake/test_iceberg.py b/tests/test_iceberg.py similarity index 98% rename from tests/lake/test_iceberg.py rename to tests/test_iceberg.py index e94796b..63c25f1 100644 --- a/tests/lake/test_iceberg.py +++ b/tests/test_iceberg.py @@ -48,7 +48,7 @@ def test_connect(self): self.assertTrue(self.iceberg.isOpen) def test_name(self): - self.assertEqual(self.iceberg.tablename, "default.bronze-test") + self.assertEqual(self.iceberg.tablename, "default.bronze_test") def test_insert(self): self.iceberg.overwrite([]) @@ -114,7 +114,7 @@ def test_load_from_catalog(self): self.iceberg.insert([{"date": "2021-01-01", "some_int": 1}]) orig_data = self.iceberg.read(["date"]).to_pylist() - iceberg = IceBerg(config, "default.bronze-test") + iceberg = IceBerg(config, "default.bronze_test") iceberg._connect() self.assertTrue(iceberg.isOpen) diff --git a/tests/svc/test_idgen.py b/tests/test_idgen.py similarity index 100% rename from tests/svc/test_idgen.py rename to tests/test_idgen.py diff --git a/tests/svc/test_io_response.py b/tests/test_io_response.py similarity index 100% rename from tests/svc/test_io_response.py rename to tests/test_io_response.py diff --git a/tests/hooks/test_parallelize.py b/tests/test_parallelize.py similarity index 96% rename from tests/hooks/test_parallelize.py rename to tests/test_parallelize.py index 8a16599..8a526da 100644 --- a/tests/hooks/test_parallelize.py +++ b/tests/test_parallelize.py @@ -123,8 +123,10 @@ def test_non_list_partifier(self): } # true because resolver did not return a list - res = evaluate_part_pre_hook(new_mapping, message, art2, self.context) - self.assertTrue(res) + self.assertRaises( + Exception, + lambda: evaluate_part_pre_hook(new_mapping, message, art2, self.context), + ) def test_w_on_complete_hook(self): self.bus._route = "test" diff --git a/tests/svc/test_rabbitmq.py b/tests/test_rabbitmq.py similarity index 100% rename from tests/svc/test_rabbitmq.py rename to tests/test_rabbitmq.py diff --git a/tests/svc/test_redis.py b/tests/test_redis.py similarity index 100% rename from tests/svc/test_redis.py rename to tests/test_redis.py