From 0edb437683c97fe5e507f763173973b6f88cd227 Mon Sep 17 00:00:00 2001 From: aboutphilippe Date: Wed, 11 Jun 2025 11:28:35 +0200 Subject: [PATCH 1/4] bump production demo --- production_demo/pyproject.toml | 2 +- production_demo/src/functions/evaluate.py | 4 ++-- production_demo/src/functions/function.py | 4 ++-- production_demo/src/functions/generate.py | 4 ++-- production_demo/src/workflows/child.py | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/production_demo/pyproject.toml b/production_demo/pyproject.toml index e800375c..9f9cee47 100644 --- a/production_demo/pyproject.toml +++ b/production_demo/pyproject.toml @@ -7,7 +7,7 @@ requires-python = ">=3.10,<3.14" readme = "README.md" dependencies = [ "pydantic>=2.10.6", - "restack-ai==0.0.91", + "restack-ai==0.0.92", "watchfiles>=1.0.4", "python-dotenv==1.0.1", "openai>=1.61.0", diff --git a/production_demo/src/functions/evaluate.py b/production_demo/src/functions/evaluate.py index 0b308f85..cd50f86c 100644 --- a/production_demo/src/functions/evaluate.py +++ b/production_demo/src/functions/evaluate.py @@ -1,4 +1,4 @@ -from restack_ai.function import function, FunctionFailure, log +from restack_ai.function import function, NonRetryableError, log from openai import OpenAI from pydantic import BaseModel @@ -11,7 +11,7 @@ async def llm_evaluate(input: EvaluateInput) -> str: client = OpenAI(base_url="http://192.168.205.1:1234/v1/",api_key="llmstudio") except Exception as e: log.error(f"Failed to create LLM client {e}") - raise FunctionFailure(f"Failed to create OpenAI client {e}", non_retryable=True) from e + raise NonRetryableError(message=f"Failed to create OpenAI client {e}") from e prompt = ( f"Evaluate the following joke for humor, creativity, and originality. " diff --git a/production_demo/src/functions/function.py b/production_demo/src/functions/function.py index f06ee0a8..1137959c 100644 --- a/production_demo/src/functions/function.py +++ b/production_demo/src/functions/function.py @@ -1,4 +1,4 @@ -from restack_ai.function import function, log, FunctionFailure +from restack_ai.function import function, log, NonRetryableError tries = 0 @@ -14,7 +14,7 @@ async def example_function(input: ExampleFunctionInput) -> str: if tries == 0: tries += 1 - raise FunctionFailure(message="Simulated failure", non_retryable=False) + raise NonRetryableError(message="Simulated failure") log.info("example function started", input=input) return f"Hello, {input.name}!" diff --git a/production_demo/src/functions/generate.py b/production_demo/src/functions/generate.py index d305a331..6af8f23c 100644 --- a/production_demo/src/functions/generate.py +++ b/production_demo/src/functions/generate.py @@ -1,4 +1,4 @@ -from restack_ai.function import function, FunctionFailure, log +from restack_ai.function import function, NonRetryableError, log from openai import OpenAI from pydantic import BaseModel @@ -13,7 +13,7 @@ async def llm_generate(input: GenerateInput) -> str: client = OpenAI(base_url="http://192.168.205.1:1234/v1/",api_key="llmstudio") except Exception as e: log.error(f"Failed to create LLM client {e}") - raise FunctionFailure(f"Failed to create OpenAI client {e}", non_retryable=True) from e + raise NonRetryableError(message=f"Failed to create OpenAI client {e}") from e try: response = client.chat.completions.create( diff --git a/production_demo/src/workflows/child.py b/production_demo/src/workflows/child.py index 6e553473..2bb1aca3 100644 --- a/production_demo/src/workflows/child.py +++ b/production_demo/src/workflows/child.py @@ -15,7 +15,7 @@ class ChildWorkflow: @workflow.run async def run(self, input: ChildWorkflowInput): log.info("ChildWorkflow started") - await workflow.step(example_function, input=ExampleFunctionInput(name='John Doe'), start_to_close_timeout=timedelta(minutes=2)) + await workflow.step(function=example_function, function_input=ExampleFunctionInput(name='John Doe'), start_to_close_timeout=timedelta(minutes=2)) await workflow.sleep(1) From 0428857bf6729757a633df124406f134e07f1951 Mon Sep 17 00:00:00 2001 From: aboutphilippe Date: Wed, 11 Jun 2025 11:35:17 +0200 Subject: [PATCH 2/4] test --- production_demo/src/workflows/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/production_demo/src/workflows/workflow.py b/production_demo/src/workflows/workflow.py index 30a5199f..8811dacb 100644 --- a/production_demo/src/workflows/workflow.py +++ b/production_demo/src/workflows/workflow.py @@ -23,7 +23,7 @@ async def run(self, input: ExampleWorkflowInput): task = workflow.child_execute( workflow=ChildWorkflow, workflow_id=f"{parent_workflow_id}-child-execute-{i+1}", - input=ChildWorkflowInput(name=f"child workflow {i+1}") + input=ChildWorkflowInput() ) tasks.append(task) From 6b87d91f3521f96ce71e33d54437319ccbac7397 Mon Sep 17 00:00:00 2001 From: aboutphilippe Date: Wed, 11 Jun 2025 15:24:31 +0200 Subject: [PATCH 3/4] fix 94 --- production_demo/src/functions/function.py | 4 +- production_demo/src/functions/generate.py | 2 +- production_demo/src/services.py | 6 +-- production_demo/src/workflows/child.py | 52 ++++++++++--------- production_demo/src/workflows/workflow.py | 63 ++++++++++++----------- 5 files changed, 68 insertions(+), 59 deletions(-) diff --git a/production_demo/src/functions/function.py b/production_demo/src/functions/function.py index 1137959c..cc9dc42b 100644 --- a/production_demo/src/functions/function.py +++ b/production_demo/src/functions/function.py @@ -1,4 +1,4 @@ -from restack_ai.function import function, log, NonRetryableError +from restack_ai.function import function, log, RetryableError tries = 0 @@ -14,7 +14,7 @@ async def example_function(input: ExampleFunctionInput) -> str: if tries == 0: tries += 1 - raise NonRetryableError(message="Simulated failure") + raise RetryableError(message="Simulated failure") log.info("example function started", input=input) return f"Hello, {input.name}!" diff --git a/production_demo/src/functions/generate.py b/production_demo/src/functions/generate.py index 6af8f23c..c0b1c4d4 100644 --- a/production_demo/src/functions/generate.py +++ b/production_demo/src/functions/generate.py @@ -10,7 +10,7 @@ class GenerateInput(BaseModel): async def llm_generate(input: GenerateInput) -> str: try: - client = OpenAI(base_url="http://192.168.205.1:1234/v1/",api_key="llmstudio") + client = OpenAI(base_url="http://192.168.178.57:1234/v1/",api_key="llmstudio") except Exception as e: log.error(f"Failed to create LLM client {e}") raise NonRetryableError(message=f"Failed to create OpenAI client {e}") from e diff --git a/production_demo/src/services.py b/production_demo/src/services.py index 17ef8d9e..7ee723ab 100644 --- a/production_demo/src/services.py +++ b/production_demo/src/services.py @@ -8,13 +8,12 @@ from src.functions.function import example_function from src.functions.generate import llm_generate from src.functions.evaluate import llm_evaluate - from src.workflows.workflow import ExampleWorkflow, ChildWorkflow + import webbrowser async def main(): - await asyncio.gather( client.start_service( workflows=[ExampleWorkflow, ChildWorkflow], @@ -22,7 +21,6 @@ async def main(): options=ServiceOptions( max_concurrent_workflow_runs=1000 ) - ), client.start_service( task_queue="llm", @@ -31,7 +29,7 @@ async def main(): rate_limit=1, max_concurrent_function_runs=1 ) - ) + ), ) def run_services(): diff --git a/production_demo/src/workflows/child.py b/production_demo/src/workflows/child.py index 2bb1aca3..780a2e6d 100644 --- a/production_demo/src/workflows/child.py +++ b/production_demo/src/workflows/child.py @@ -1,6 +1,6 @@ from datetime import timedelta from pydantic import BaseModel, Field -from restack_ai.workflow import workflow, import_functions, log +from restack_ai.workflow import workflow, import_functions, log, NonRetryableError, RetryPolicy with import_functions(): from src.functions.function import example_function, ExampleFunctionInput @@ -14,28 +14,34 @@ class ChildWorkflowInput(BaseModel): class ChildWorkflow: @workflow.run async def run(self, input: ChildWorkflowInput): + log.info("ChildWorkflow started") - await workflow.step(function=example_function, function_input=ExampleFunctionInput(name='John Doe'), start_to_close_timeout=timedelta(minutes=2)) - - await workflow.sleep(1) - - generated_text = await workflow.step( - function=llm_generate, - function_input=GenerateInput(prompt=input.prompt), - task_queue="llm", - start_to_close_timeout=timedelta(minutes=2) - ) - - evaluation = await workflow.step( - function=llm_evaluate, - function_input=EvaluateInput(generated_text=generated_text), - task_queue="llm", - start_to_close_timeout=timedelta(minutes=5) - ) - - return { - "generated_text": generated_text, - "evaluation": evaluation - } + + try: + await workflow.step(function=example_function, function_input=ExampleFunctionInput(name='John Doe'), start_to_close_timeout=timedelta(minutes=2), retry_policy=RetryPolicy(maximum_attempts=3)) + + await workflow.sleep(1) + + generated_text = await workflow.step( + function=llm_generate, + function_input=GenerateInput(prompt=input.prompt), + task_queue="llm", + start_to_close_timeout=timedelta(minutes=2) + ) + + evaluation = await workflow.step( + function=llm_evaluate, + function_input=EvaluateInput(generated_text=generated_text), + task_queue="llm", + start_to_close_timeout=timedelta(minutes=5) + ) + + return { + "generated_text": generated_text, + "evaluation": evaluation + } + except Exception as e: + log.error(f"ChildWorkflow failed {e}") + raise NonRetryableError(message=f"ChildWorkflow failed {e}") from e diff --git a/production_demo/src/workflows/workflow.py b/production_demo/src/workflows/workflow.py index 8811dacb..45e41e9a 100644 --- a/production_demo/src/workflows/workflow.py +++ b/production_demo/src/workflows/workflow.py @@ -1,7 +1,7 @@ import asyncio from datetime import timedelta from pydantic import BaseModel, Field -from restack_ai.workflow import workflow, log, workflow_info, import_functions +from restack_ai.workflow import workflow, log, workflow_info, import_functions, NonRetryableError from .child import ChildWorkflow, ChildWorkflowInput with import_functions(): @@ -14,34 +14,39 @@ class ExampleWorkflowInput(BaseModel): class ExampleWorkflow: @workflow.run async def run(self, input: ExampleWorkflowInput): - # use the parent run id to create child workflow ids - parent_workflow_id = workflow_info().workflow_id - - tasks = [] - for i in range(input.amount): - log.info(f"Queue ChildWorkflow {i+1} for execution") - task = workflow.child_execute( - workflow=ChildWorkflow, - workflow_id=f"{parent_workflow_id}-child-execute-{i+1}", - input=ChildWorkflowInput() + + try: + # use the parent run id to create child workflow ids + parent_workflow_id = workflow_info().workflow_id + + tasks = [] + for i in range(input.amount): + log.info(f"Queue ChildWorkflow {i+1} for execution") + task = workflow.child_execute( + workflow=ChildWorkflow, + workflow_id=f"{parent_workflow_id}-child-execute-{i+1}", + workflow_input=ChildWorkflowInput(prompt="Generate a random joke in max 20 words."), + ) + tasks.append(task) + + # Run all child workflows in parallel and wait for their results + results = await asyncio.gather(*tasks) + + for i, result in enumerate(results, start=1): + log.info(f"ChildWorkflow {i} completed", result=result) + + generated_text = await workflow.step( + function=llm_generate, + function_input=GenerateInput(prompt=f"Give me the top 3 unique jokes according to the results. {results}"), + task_queue="llm", + start_to_close_timeout=timedelta(minutes=2) ) - tasks.append(task) - # Run all child workflows in parallel and wait for their results - results = await asyncio.gather(*tasks) - - for i, result in enumerate(results, start=1): - log.info(f"ChildWorkflow {i} completed", result=result) - - generated_text = await workflow.step( - function=llm_generate, - function_input=GenerateInput(prompt=f"Give me the top 3 unique jokes according to the results. {results}"), - task_queue="llm", - start_to_close_timeout=timedelta(minutes=2) - ) - - return { - "top_jokes": generated_text, - "results": results - } + return { + "top_jokes": generated_text, + "results": results + } + except Exception as e: + log.error(f"ExampleWorkflow failed {e}") + raise NonRetryableError(message=f"ExampleWorkflow failed {e}") from e \ No newline at end of file From 80287077a2dce4adacd00616654f89ca4283d544 Mon Sep 17 00:00:00 2001 From: aboutphilippe Date: Wed, 11 Jun 2025 15:24:38 +0200 Subject: [PATCH 4/4] bump 94 --- production_demo/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/production_demo/pyproject.toml b/production_demo/pyproject.toml index 9f9cee47..041ac134 100644 --- a/production_demo/pyproject.toml +++ b/production_demo/pyproject.toml @@ -7,7 +7,7 @@ requires-python = ">=3.10,<3.14" readme = "README.md" dependencies = [ "pydantic>=2.10.6", - "restack-ai==0.0.92", + "restack-ai==0.0.94", "watchfiles>=1.0.4", "python-dotenv==1.0.1", "openai>=1.61.0",