diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 71f62dcd..28f871fa 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -7,7 +7,7 @@ jobs: - uses: actions/checkout@v3 - uses: psf/black@stable with: - version: 23.12.0 + version: 24.4.2 flake8-lint: runs-on: ubuntu-latest @@ -61,4 +61,4 @@ jobs: poetry install --no-interaction - name: test run: | - poetry run pytest + PYTHONASYNCIODEBUG=1 poetry run pytest diff --git a/examples/audio-to-text/whisper-large/new_pipeline.py b/examples/audio-to-text/whisper-large/new_pipeline.py index 9a9cee28..4da0c42e 100644 --- a/examples/audio-to-text/whisper-large/new_pipeline.py +++ b/examples/audio-to-text/whisper-large/new_pipeline.py @@ -24,8 +24,6 @@ class ModelKwargs(InputSchema): @entity class WhisperModel: - def __init__(self): - ... @pipe(on_startup=True, run_once=True) def load(self): diff --git a/examples/image-to-image/t2i-adapter-sketch/new_pipeline.py b/examples/image-to-image/t2i-adapter-sketch/new_pipeline.py index a9b8118a..ccb89123 100644 --- a/examples/image-to-image/t2i-adapter-sketch/new_pipeline.py +++ b/examples/image-to-image/t2i-adapter-sketch/new_pipeline.py @@ -108,8 +108,6 @@ class ModelKwargs(InputSchema): @entity class DiffusionWithAdapter: - def __init__(self) -> None: - ... def apply_style( self, style_name: str, positive: str, negative: str = "" diff --git a/examples/text-to-audio/musicgen-large/new_pipeline.py b/examples/text-to-audio/musicgen-large/new_pipeline.py index 63877a51..568aafb7 100644 --- a/examples/text-to-audio/musicgen-large/new_pipeline.py +++ b/examples/text-to-audio/musicgen-large/new_pipeline.py @@ -4,8 +4,6 @@ @entity class MusicgenModel: - def __init__(self): - ... @pipe(on_startup=True, run_once=True) def load(self): diff --git a/pipeline/console/cluster.py b/pipeline/console/cluster.py index 426b00ee..b159fa70 100644 --- a/pipeline/console/cluster.py +++ b/pipeline/console/cluster.py @@ -52,7 +52,7 @@ def use_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: def remove_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: - ... + pass def get_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: diff --git a/pipeline/console/commands.py b/pipeline/console/commands.py index b5dabad5..45673b77 100644 --- a/pipeline/console/commands.py +++ b/pipeline/console/commands.py @@ -2,7 +2,6 @@ from pipeline.console import cluster, container, logs from pipeline.console.targets import ( - environments, files, pipelines, pointers, @@ -23,7 +22,6 @@ def create_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: dest="target", ) - environments.create_parser(create_sub_parser) pointers.create_parser(create_sub_parser) resources.create_parser(create_sub_parser) files.create_parser(create_sub_parser) @@ -42,7 +40,6 @@ def edit_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: dest="target", ) - environments.edit_parser(edit_sub_parser) pipelines.edit_parser(edit_sub_parser) pointers.edit_parser(edit_sub_parser) files.edit_parser(edit_sub_parser) @@ -61,7 +58,6 @@ def get_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: dest="target", ) - environments.get_parser(get_sub_parser) pipelines.get_parser(get_sub_parser) pointers.get_parser(get_sub_parser) resources.get_parser(get_sub_parser) @@ -82,7 +78,6 @@ def delete_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: dest="target", ) - environments.delete_parser(delete_sub_parser) pipelines.delete_parser(delete_sub_parser) pointers.delete_parser(delete_sub_parser) resources.delete_parser(delete_sub_parser) diff --git a/pipeline/console/targets/environments.py b/pipeline/console/targets/environments.py deleted file mode 100644 index 8711c8ae..00000000 --- a/pipeline/console/targets/environments.py +++ /dev/null @@ -1,17 +0,0 @@ -from argparse import ArgumentParser, _SubParsersAction - - -def create_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: - ... - - -def edit_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: - ... - - -def get_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: - ... - - -def delete_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: - ... diff --git a/pipeline/console/targets/files.py b/pipeline/console/targets/files.py index 2f9f824a..f8eb8319 100644 --- a/pipeline/console/targets/files.py +++ b/pipeline/console/targets/files.py @@ -41,7 +41,7 @@ def create_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: def edit_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: - ... + pass def get_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: @@ -73,7 +73,7 @@ def get_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: def delete_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: - ... + pass def _create_file(args: Namespace) -> None: diff --git a/pipeline/console/targets/resources.py b/pipeline/console/targets/resources.py index 71cfe90a..8097a860 100644 --- a/pipeline/console/targets/resources.py +++ b/pipeline/console/targets/resources.py @@ -10,7 +10,7 @@ def create_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: - ... + pass def get_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: @@ -21,7 +21,7 @@ def get_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: def delete_parser(command_parser: "_SubParsersAction[ArgumentParser]") -> None: - ... + pass # Functions diff --git a/pipeline/container/startup.py b/pipeline/container/startup.py index 01f9ccc9..8dbc1f87 100644 --- a/pipeline/container/startup.py +++ b/pipeline/container/startup.py @@ -2,6 +2,7 @@ import os import traceback import uuid +from contextlib import asynccontextmanager import pkg_resources from fastapi import FastAPI, Request @@ -20,24 +21,13 @@ def create_app() -> FastAPI: - app = FastAPI( - title="pipeline-container", - ) + app = FastAPI(title="pipeline-container", lifespan=lifespan) setup_logging() setup_oapi(app) setup_middlewares(app) - app.state.execution_queue = asyncio.Queue() - app.state.manager = Manager( - pipeline_path=os.environ.get( - "PIPELINE_PATH", - "", - ) - ) - asyncio.create_task(execution_handler(app.state.execution_queue, app.state.manager)) - app.include_router(router) app.include_router(status_router) static_dir = pkg_resources.resource_filename( @@ -53,6 +43,22 @@ def create_app() -> FastAPI: return app +@asynccontextmanager +async def lifespan(app: FastAPI): + app.state.execution_queue = asyncio.Queue() + app.state.manager = Manager( + pipeline_path=os.environ.get( + "PIPELINE_PATH", + "", + ) + ) + task = asyncio.create_task( + execution_handler(app.state.execution_queue, app.state.manager) + ) + yield + task.cancel() + + def setup_middlewares(app: FastAPI) -> None: @app.middleware("http") async def _(request: Request, call_next): diff --git a/pipeline/objects/pipeline.py b/pipeline/objects/pipeline.py index ae8dff25..f67a5cbe 100644 --- a/pipeline/objects/pipeline.py +++ b/pipeline/objects/pipeline.py @@ -6,11 +6,6 @@ class Pipeline: _current_pipeline: Graph _pipeline_context_active: bool = False - def __init__( - self, - ): - ... - def __enter__(self): Pipeline._pipeline_context_active = True diff --git a/poetry.lock b/poetry.lock index c3436383..cc25bcb7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "anyio" @@ -23,33 +23,33 @@ trio = ["trio (<0.22)"] [[package]] name = "black" -version = "23.12.1" +version = "24.4.2" description = "The uncompromising code formatter." optional = false python-versions = ">=3.8" files = [ - {file = "black-23.12.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e0aaf6041986767a5e0ce663c7a2f0e9eaf21e6ff87a5f95cbf3675bfd4c41d2"}, - {file = "black-23.12.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c88b3711d12905b74206227109272673edce0cb29f27e1385f33b0163c414bba"}, - {file = "black-23.12.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a920b569dc6b3472513ba6ddea21f440d4b4c699494d2e972a1753cdc25df7b0"}, - {file = "black-23.12.1-cp310-cp310-win_amd64.whl", hash = "sha256:3fa4be75ef2a6b96ea8d92b1587dd8cb3a35c7e3d51f0738ced0781c3aa3a5a3"}, - {file = "black-23.12.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:8d4df77958a622f9b5a4c96edb4b8c0034f8434032ab11077ec6c56ae9f384ba"}, - {file = "black-23.12.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:602cfb1196dc692424c70b6507593a2b29aac0547c1be9a1d1365f0d964c353b"}, - {file = "black-23.12.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9c4352800f14be5b4864016882cdba10755bd50805c95f728011bcb47a4afd59"}, - {file = "black-23.12.1-cp311-cp311-win_amd64.whl", hash = "sha256:0808494f2b2df923ffc5723ed3c7b096bd76341f6213989759287611e9837d50"}, - {file = "black-23.12.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:25e57fd232a6d6ff3f4478a6fd0580838e47c93c83eaf1ccc92d4faf27112c4e"}, - {file = "black-23.12.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2d9e13db441c509a3763a7a3d9a49ccc1b4e974a47be4e08ade2a228876500ec"}, - {file = "black-23.12.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6d1bd9c210f8b109b1762ec9fd36592fdd528485aadb3f5849b2740ef17e674e"}, - {file = "black-23.12.1-cp312-cp312-win_amd64.whl", hash = "sha256:ae76c22bde5cbb6bfd211ec343ded2163bba7883c7bc77f6b756a1049436fbb9"}, - {file = "black-23.12.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:1fa88a0f74e50e4487477bc0bb900c6781dbddfdfa32691e780bf854c3b4a47f"}, - {file = "black-23.12.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:a4d6a9668e45ad99d2f8ec70d5c8c04ef4f32f648ef39048d010b0689832ec6d"}, - {file = "black-23.12.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b18fb2ae6c4bb63eebe5be6bd869ba2f14fd0259bda7d18a46b764d8fb86298a"}, - {file = "black-23.12.1-cp38-cp38-win_amd64.whl", hash = "sha256:c04b6d9d20e9c13f43eee8ea87d44156b8505ca8a3c878773f68b4e4812a421e"}, - {file = "black-23.12.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:3e1b38b3135fd4c025c28c55ddfc236b05af657828a8a6abe5deec419a0b7055"}, - {file = "black-23.12.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4f0031eaa7b921db76decd73636ef3a12c942ed367d8c3841a0739412b260a54"}, - {file = "black-23.12.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:97e56155c6b737854e60a9ab1c598ff2533d57e7506d97af5481141671abf3ea"}, - {file = "black-23.12.1-cp39-cp39-win_amd64.whl", hash = "sha256:dd15245c8b68fe2b6bd0f32c1556509d11bb33aec9b5d0866dd8e2ed3dba09c2"}, - {file = "black-23.12.1-py3-none-any.whl", hash = "sha256:78baad24af0f033958cad29731e27363183e140962595def56423e626f4bee3e"}, - {file = "black-23.12.1.tar.gz", hash = "sha256:4ce3ef14ebe8d9509188014d96af1c456a910d5b5cbf434a09fef7e024b3d0d5"}, + {file = "black-24.4.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:dd1b5a14e417189db4c7b64a6540f31730713d173f0b63e55fabd52d61d8fdce"}, + {file = "black-24.4.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8e537d281831ad0e71007dcdcbe50a71470b978c453fa41ce77186bbe0ed6021"}, + {file = "black-24.4.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eaea3008c281f1038edb473c1aa8ed8143a5535ff18f978a318f10302b254063"}, + {file = "black-24.4.2-cp310-cp310-win_amd64.whl", hash = "sha256:7768a0dbf16a39aa5e9a3ded568bb545c8c2727396d063bbaf847df05b08cd96"}, + {file = "black-24.4.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:257d724c2c9b1660f353b36c802ccece186a30accc7742c176d29c146df6e474"}, + {file = "black-24.4.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:bdde6f877a18f24844e381d45e9947a49e97933573ac9d4345399be37621e26c"}, + {file = "black-24.4.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e151054aa00bad1f4e1f04919542885f89f5f7d086b8a59e5000e6c616896ffb"}, + {file = "black-24.4.2-cp311-cp311-win_amd64.whl", hash = "sha256:7e122b1c4fb252fd85df3ca93578732b4749d9be076593076ef4d07a0233c3e1"}, + {file = "black-24.4.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:accf49e151c8ed2c0cdc528691838afd217c50412534e876a19270fea1e28e2d"}, + {file = "black-24.4.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:88c57dc656038f1ab9f92b3eb5335ee9b021412feaa46330d5eba4e51fe49b04"}, + {file = "black-24.4.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:be8bef99eb46d5021bf053114442914baeb3649a89dc5f3a555c88737e5e98fc"}, + {file = "black-24.4.2-cp312-cp312-win_amd64.whl", hash = "sha256:415e686e87dbbe6f4cd5ef0fbf764af7b89f9057b97c908742b6008cc554b9c0"}, + {file = "black-24.4.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:bf10f7310db693bb62692609b397e8d67257c55f949abde4c67f9cc574492cc7"}, + {file = "black-24.4.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:98e123f1d5cfd42f886624d84464f7756f60ff6eab89ae845210631714f6db94"}, + {file = "black-24.4.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:48a85f2cb5e6799a9ef05347b476cce6c182d6c71ee36925a6c194d074336ef8"}, + {file = "black-24.4.2-cp38-cp38-win_amd64.whl", hash = "sha256:b1530ae42e9d6d5b670a34db49a94115a64596bc77710b1d05e9801e62ca0a7c"}, + {file = "black-24.4.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:37aae07b029fa0174d39daf02748b379399b909652a806e5708199bd93899da1"}, + {file = "black-24.4.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:da33a1a5e49c4122ccdfd56cd021ff1ebc4a1ec4e2d01594fef9b6f267a9e741"}, + {file = "black-24.4.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ef703f83fc32e131e9bcc0a5094cfe85599e7109f896fe8bc96cc402f3eb4b6e"}, + {file = "black-24.4.2-cp39-cp39-win_amd64.whl", hash = "sha256:b9176b9832e84308818a99a561e90aa479e73c523b3f77afd07913380ae2eab7"}, + {file = "black-24.4.2-py3-none-any.whl", hash = "sha256:d36ed1124bb81b32f8614555b34cc4259c3fbc7eec17870e8ff8ded335b58d8c"}, + {file = "black-24.4.2.tar.gz", hash = "sha256:c872b53057f000085da66a19c55d68f6f8ddcac2642392ad3a355878406fbd4d"}, ] [package.dependencies] @@ -1255,4 +1255,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "c476e84ff28d3dafdf010261e99dc32107677b2d5a2c57e9fffa9fc48c97f120" +content-hash = "59cf5f8d69a76b5bd4df3c7f0573a575ef08aadac8f80ca4ba4d17ef1556cd5e" diff --git a/pyproject.toml b/pyproject.toml index ce747818..9d98187e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pipeline-ai" -version = "2.4.6" +version = "2.4.7" description = "Pipelines for machine learning workloads." authors = [ "Paul Hetherington ", @@ -32,7 +32,7 @@ fastapi = "^0.105.0" [tool.poetry.group.dev.dependencies] pytest = "^7.4.3" -black = "^23.12.0" +black = "^24.4.2" pre-commit = "^3.6.0" flake8 = "^6.1.0" isort = "^5.13.2" diff --git a/tests/container/fixtures/__init__.py b/tests/container/fixtures/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/container/fixtures/adder_pipeline.py b/tests/container/fixtures/adder_pipeline.py new file mode 100644 index 00000000..9af7c514 --- /dev/null +++ b/tests/container/fixtures/adder_pipeline.py @@ -0,0 +1,21 @@ +import logging + +from pipeline import Pipeline, Variable, pipe + +logger = logging.getLogger(__name__) + + +@pipe +def add(first: int, second: int) -> int: + if first < 0 or second < 0: + raise ValueError("I can only sum positive integers") + return first + second + + +with Pipeline() as builder: + first = Variable(int) + second = Variable(int) + result = add(first, second) + builder.output(result) + +my_pipeline = builder.get_pipeline() diff --git a/tests/container/routes/__init__.py b/tests/container/routes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/container/routes/conftest.py b/tests/container/routes/conftest.py new file mode 100644 index 00000000..0cabfcfc --- /dev/null +++ b/tests/container/routes/conftest.py @@ -0,0 +1,16 @@ +import pytest +from fastapi.testclient import TestClient + +from pipeline.container.startup import create_app + + +@pytest.fixture +async def app(): + app = create_app() + return app + + +@pytest.fixture +async def client(app): + with TestClient(app) as client: + yield client diff --git a/tests/container/routes/test_runs.py b/tests/container/routes/test_runs.py index c7af202b..26be1d1a 100644 --- a/tests/container/routes/test_runs.py +++ b/tests/container/routes/test_runs.py @@ -1,124 +1,116 @@ -import json +import pytest +from fastapi import status +from fastapi.testclient import TestClient from pipeline.cloud.schemas import runs as run_schemas -from pipeline.container.routes.v4.runs import _stream_run_outputs -from pipeline.objects.graph import Stream -class DummyRequest: - """A dummy request object for use in this test""" +@pytest.fixture +async def client(app, monkeypatch): + # loads a pipeline which sums 2 inputs + monkeypatch.setenv( + "PIPELINE_PATH", "tests.container.fixtures.adder_pipeline:my_pipeline" + ) + with TestClient(app) as client: + yield client - async def is_disconnected(self): - return False +class TestCreateRun: -async def test_stream_run_outputs(): - """Test that the order of outputs is as expected when we have a combination - of static and stream outputs. Note that the stream outputs return different - amounts of data. - """ - stream_output_one = run_schemas.RunOutput( - type=run_schemas.RunIOType.stream, value=Stream(iter([1, 2, 3, 4])), file=None - ) - stream_output_two = run_schemas.RunOutput( - type=run_schemas.RunIOType.stream, - value=Stream(iter(["hello", "world"])), - file=None, - ) - static_output = run_schemas.RunOutput( - type=run_schemas.RunIOType.string, value="static output", file=None - ) - container_run_result = run_schemas.ContainerRunResult( - inputs=None, - outputs=[stream_output_one, static_output, stream_output_two], - error=None, - ) + def test_success(self, client): - results = [ - result - async for result in _stream_run_outputs(container_run_result, DummyRequest()) - ] - - output_values = [] - for result, status_code in results: - assert status_code == 200 - outputs = json.loads(result)["outputs"] - values = [o["value"] for o in outputs] - output_values.append(values) - - assert output_values == [ - [ - 1, - "static output", - "hello", - ], - [ - 2, - "static output", - "world", - ], - [ - 3, - "static output", - None, - ], - [ - 4, - "static output", - None, - ], - ] - - -async def test_stream_run_outputs_when_exception_raised(): - """Test streaming outputs when pipeline raises an exception. - - Error should be reported back to the user. - """ - - def error_stream(): - yield 1 - raise Exception("dummy error") - - stream_output_one = run_schemas.RunOutput( - type=run_schemas.RunIOType.stream, value=Stream(error_stream()), file=None - ) - stream_output_two = run_schemas.RunOutput( - type=run_schemas.RunIOType.stream, - value=Stream(iter(["hello", "world"])), - file=None, - ) - static_output = run_schemas.RunOutput( - type=run_schemas.RunIOType.string, value="static output", file=None - ) - container_run_result = run_schemas.ContainerRunResult( - inputs=None, - outputs=[stream_output_one, static_output, stream_output_two], - error=None, - ) - - results = [ - (result, status_code) - async for result, status_code in _stream_run_outputs( - container_run_result, DummyRequest() + payload = run_schemas.ContainerRunCreate( + run_id="run_123", + inputs=[ + run_schemas.RunInput(type="integer", value=5), + run_schemas.RunInput(type="integer", value=4), + ], + ) + response = client.post("/v4/runs", json=payload.dict()) + + assert response.status_code == status.HTTP_200_OK + result = run_schemas.ContainerRunResult.parse_obj(response.json()) + assert result.error is None + assert result.outputs == [run_schemas.RunOutput(type="integer", value=9)] + + # make another run to ensure execution handler is still working as expected + payload = run_schemas.ContainerRunCreate( + run_id="run_124", + inputs=[ + run_schemas.RunInput(type="integer", value=5), + run_schemas.RunInput(type="integer", value=10), + ], + ) + response = client.post("/v4/runs", json=payload.dict()) + + assert response.status_code == status.HTTP_200_OK + result = run_schemas.ContainerRunResult.parse_obj(response.json()) + assert result.error is None + assert result.outputs == [run_schemas.RunOutput(type="integer", value=15)] + + def test_when_pipeline_failed_to_load(self, app, monkeypatch): + # use invalid path to simulate pipeline failed error + monkeypatch.setenv("PIPELINE_PATH", "tests.container.fixtures.oops:my_pipeline") + with TestClient(app) as client: + + payload = run_schemas.ContainerRunCreate( + run_id="run_123", + inputs=[ + run_schemas.RunInput(type="integer", value=5), + run_schemas.RunInput(type="integer", value=4), + ], + ) + response = client.post("/v4/runs", json=payload.dict()) + + assert response.status_code == status.HTTP_200_OK + result = run_schemas.ContainerRunResult.parse_obj(response.json()) + assert result.outputs is None + assert result.error is not None + error = run_schemas.ContainerRunError.parse_obj(result.error) + assert error.type == run_schemas.ContainerRunErrorType.startup_error + assert error.message == "Pipeline failed to load" + assert error.traceback is not None + + def test_when_invalid_inputs(self, client): + payload = run_schemas.ContainerRunCreate( + run_id="run_123", + # one input is missing + inputs=[ + run_schemas.RunInput(type="integer", value=5), + ], + ) + response = client.post("/v4/runs", json=payload.dict()) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + result = run_schemas.ContainerRunResult.parse_obj(response.json()) + assert result.outputs is None + assert result.error is not None + error = run_schemas.ContainerRunError.parse_obj(result.error) + assert error.type == run_schemas.ContainerRunErrorType.input_error + assert error.message == "Inputs do not match graph inputs" + + def test_when_pipeline_raises_an_exception(self, client): + """We've set up the fixture pipeline to only accept positive integers, + so providing negative ones should result in a RunnableError. + + (Note: in reality we could add options to our inputs to handle this and + return an input_error) + """ + payload = run_schemas.ContainerRunCreate( + run_id="run_123", + inputs=[ + run_schemas.RunInput(type="integer", value=-5), + run_schemas.RunInput(type="integer", value=5), + ], ) - ] - data = [json.loads(result) for result, _ in results] - status_codes = [status_code for _, status_code in results] - # even if pipeline_error, status code should be 200 - assert all(status_code == 200 for status_code in status_codes) - - # exception was raised on 2nd iteration, so we expect there to be a valid - # output followed by an error - assert len(results) == 2 - - assert data[0]["outputs"] == [ - {"type": "integer", "value": 1, "file": None}, - {"type": "string", "value": "static output", "file": None}, - {"type": "string", "value": "hello", "file": None}, - ] - - error = data[1].get("error") - assert error is not None - assert error["message"] == "Exception('dummy error')" - assert error["type"] == "pipeline_error" + response = client.post("/v4/runs", json=payload.dict()) + + assert response.status_code == status.HTTP_200_OK + result = run_schemas.ContainerRunResult.parse_obj(response.json()) + assert result.outputs is None + assert result.error is not None + error = run_schemas.ContainerRunError.parse_obj(result.error) + assert error.type == run_schemas.ContainerRunErrorType.pipeline_error + assert error.message == "ValueError('I can only sum positive integers')" + assert error.traceback is not None + assert error.traceback.startswith("Traceback (most recent call last):") diff --git a/tests/container/routes/test_runs_streaming.py b/tests/container/routes/test_runs_streaming.py new file mode 100644 index 00000000..c7af202b --- /dev/null +++ b/tests/container/routes/test_runs_streaming.py @@ -0,0 +1,124 @@ +import json + +from pipeline.cloud.schemas import runs as run_schemas +from pipeline.container.routes.v4.runs import _stream_run_outputs +from pipeline.objects.graph import Stream + + +class DummyRequest: + """A dummy request object for use in this test""" + + async def is_disconnected(self): + return False + + +async def test_stream_run_outputs(): + """Test that the order of outputs is as expected when we have a combination + of static and stream outputs. Note that the stream outputs return different + amounts of data. + """ + stream_output_one = run_schemas.RunOutput( + type=run_schemas.RunIOType.stream, value=Stream(iter([1, 2, 3, 4])), file=None + ) + stream_output_two = run_schemas.RunOutput( + type=run_schemas.RunIOType.stream, + value=Stream(iter(["hello", "world"])), + file=None, + ) + static_output = run_schemas.RunOutput( + type=run_schemas.RunIOType.string, value="static output", file=None + ) + container_run_result = run_schemas.ContainerRunResult( + inputs=None, + outputs=[stream_output_one, static_output, stream_output_two], + error=None, + ) + + results = [ + result + async for result in _stream_run_outputs(container_run_result, DummyRequest()) + ] + + output_values = [] + for result, status_code in results: + assert status_code == 200 + outputs = json.loads(result)["outputs"] + values = [o["value"] for o in outputs] + output_values.append(values) + + assert output_values == [ + [ + 1, + "static output", + "hello", + ], + [ + 2, + "static output", + "world", + ], + [ + 3, + "static output", + None, + ], + [ + 4, + "static output", + None, + ], + ] + + +async def test_stream_run_outputs_when_exception_raised(): + """Test streaming outputs when pipeline raises an exception. + + Error should be reported back to the user. + """ + + def error_stream(): + yield 1 + raise Exception("dummy error") + + stream_output_one = run_schemas.RunOutput( + type=run_schemas.RunIOType.stream, value=Stream(error_stream()), file=None + ) + stream_output_two = run_schemas.RunOutput( + type=run_schemas.RunIOType.stream, + value=Stream(iter(["hello", "world"])), + file=None, + ) + static_output = run_schemas.RunOutput( + type=run_schemas.RunIOType.string, value="static output", file=None + ) + container_run_result = run_schemas.ContainerRunResult( + inputs=None, + outputs=[stream_output_one, static_output, stream_output_two], + error=None, + ) + + results = [ + (result, status_code) + async for result, status_code in _stream_run_outputs( + container_run_result, DummyRequest() + ) + ] + data = [json.loads(result) for result, _ in results] + status_codes = [status_code for _, status_code in results] + # even if pipeline_error, status code should be 200 + assert all(status_code == 200 for status_code in status_codes) + + # exception was raised on 2nd iteration, so we expect there to be a valid + # output followed by an error + assert len(results) == 2 + + assert data[0]["outputs"] == [ + {"type": "integer", "value": 1, "file": None}, + {"type": "string", "value": "static output", "file": None}, + {"type": "string", "value": "hello", "file": None}, + ] + + error = data[1].get("error") + assert error is not None + assert error["message"] == "Exception('dummy error')" + assert error["type"] == "pipeline_error" diff --git a/tests/container/routes/test_status.py b/tests/container/routes/test_status.py new file mode 100644 index 00000000..0947ea19 --- /dev/null +++ b/tests/container/routes/test_status.py @@ -0,0 +1,6 @@ +from fastapi import status + + +async def test_status(client): + response = client.get("/status") + assert response.status_code == status.HTTP_200_OK diff --git a/tests/test_pipeline_function.py b/tests/test_pipeline_function.py index 32689acb..b5bb9c42 100644 --- a/tests/test_pipeline_function.py +++ b/tests/test_pipeline_function.py @@ -52,7 +52,7 @@ def test_function() -> Tuple[str, int]: @pipe def test_function_2(input_1: str): - ... + pass with pytest.raises(Exception): with Pipeline() as builder: