diff --git a/production_demo/pyproject.toml b/production_demo/pyproject.toml index e800375c..041ac134 100644 --- a/production_demo/pyproject.toml +++ b/production_demo/pyproject.toml @@ -7,7 +7,7 @@ requires-python = ">=3.10,<3.14" readme = "README.md" dependencies = [ "pydantic>=2.10.6", - "restack-ai==0.0.91", + "restack-ai==0.0.94", "watchfiles>=1.0.4", "python-dotenv==1.0.1", "openai>=1.61.0", diff --git a/production_demo/src/functions/evaluate.py b/production_demo/src/functions/evaluate.py index 0b308f85..cd50f86c 100644 --- a/production_demo/src/functions/evaluate.py +++ b/production_demo/src/functions/evaluate.py @@ -1,4 +1,4 @@ -from restack_ai.function import function, FunctionFailure, log +from restack_ai.function import function, NonRetryableError, log from openai import OpenAI from pydantic import BaseModel @@ -11,7 +11,7 @@ async def llm_evaluate(input: EvaluateInput) -> str: client = OpenAI(base_url="http://192.168.205.1:1234/v1/",api_key="llmstudio") except Exception as e: log.error(f"Failed to create LLM client {e}") - raise FunctionFailure(f"Failed to create OpenAI client {e}", non_retryable=True) from e + raise NonRetryableError(message=f"Failed to create OpenAI client {e}") from e prompt = ( f"Evaluate the following joke for humor, creativity, and originality. " diff --git a/production_demo/src/functions/function.py b/production_demo/src/functions/function.py index f06ee0a8..cc9dc42b 100644 --- a/production_demo/src/functions/function.py +++ b/production_demo/src/functions/function.py @@ -1,4 +1,4 @@ -from restack_ai.function import function, log, FunctionFailure +from restack_ai.function import function, log, RetryableError tries = 0 @@ -14,7 +14,7 @@ async def example_function(input: ExampleFunctionInput) -> str: if tries == 0: tries += 1 - raise FunctionFailure(message="Simulated failure", non_retryable=False) + raise RetryableError(message="Simulated failure") log.info("example function started", input=input) return f"Hello, {input.name}!" diff --git a/production_demo/src/functions/generate.py b/production_demo/src/functions/generate.py index d305a331..c0b1c4d4 100644 --- a/production_demo/src/functions/generate.py +++ b/production_demo/src/functions/generate.py @@ -1,4 +1,4 @@ -from restack_ai.function import function, FunctionFailure, log +from restack_ai.function import function, NonRetryableError, log from openai import OpenAI from pydantic import BaseModel @@ -10,10 +10,10 @@ class GenerateInput(BaseModel): async def llm_generate(input: GenerateInput) -> str: try: - client = OpenAI(base_url="http://192.168.205.1:1234/v1/",api_key="llmstudio") + client = OpenAI(base_url="http://192.168.178.57:1234/v1/",api_key="llmstudio") except Exception as e: log.error(f"Failed to create LLM client {e}") - raise FunctionFailure(f"Failed to create OpenAI client {e}", non_retryable=True) from e + raise NonRetryableError(message=f"Failed to create OpenAI client {e}") from e try: response = client.chat.completions.create( diff --git a/production_demo/src/services.py b/production_demo/src/services.py index 17ef8d9e..7ee723ab 100644 --- a/production_demo/src/services.py +++ b/production_demo/src/services.py @@ -8,13 +8,12 @@ from src.functions.function import example_function from src.functions.generate import llm_generate from src.functions.evaluate import llm_evaluate - from src.workflows.workflow import ExampleWorkflow, ChildWorkflow + import webbrowser async def main(): - await asyncio.gather( client.start_service( workflows=[ExampleWorkflow, ChildWorkflow], @@ -22,7 +21,6 @@ async def main(): options=ServiceOptions( max_concurrent_workflow_runs=1000 ) - ), client.start_service( task_queue="llm", @@ -31,7 +29,7 @@ async def main(): rate_limit=1, max_concurrent_function_runs=1 ) - ) + ), ) def run_services(): diff --git a/production_demo/src/workflows/child.py b/production_demo/src/workflows/child.py index 6e553473..780a2e6d 100644 --- a/production_demo/src/workflows/child.py +++ b/production_demo/src/workflows/child.py @@ -1,6 +1,6 @@ from datetime import timedelta from pydantic import BaseModel, Field -from restack_ai.workflow import workflow, import_functions, log +from restack_ai.workflow import workflow, import_functions, log, NonRetryableError, RetryPolicy with import_functions(): from src.functions.function import example_function, ExampleFunctionInput @@ -14,28 +14,34 @@ class ChildWorkflowInput(BaseModel): class ChildWorkflow: @workflow.run async def run(self, input: ChildWorkflowInput): + log.info("ChildWorkflow started") - await workflow.step(example_function, input=ExampleFunctionInput(name='John Doe'), start_to_close_timeout=timedelta(minutes=2)) - - await workflow.sleep(1) - - generated_text = await workflow.step( - function=llm_generate, - function_input=GenerateInput(prompt=input.prompt), - task_queue="llm", - start_to_close_timeout=timedelta(minutes=2) - ) - - evaluation = await workflow.step( - function=llm_evaluate, - function_input=EvaluateInput(generated_text=generated_text), - task_queue="llm", - start_to_close_timeout=timedelta(minutes=5) - ) - - return { - "generated_text": generated_text, - "evaluation": evaluation - } + + try: + await workflow.step(function=example_function, function_input=ExampleFunctionInput(name='John Doe'), start_to_close_timeout=timedelta(minutes=2), retry_policy=RetryPolicy(maximum_attempts=3)) + + await workflow.sleep(1) + + generated_text = await workflow.step( + function=llm_generate, + function_input=GenerateInput(prompt=input.prompt), + task_queue="llm", + start_to_close_timeout=timedelta(minutes=2) + ) + + evaluation = await workflow.step( + function=llm_evaluate, + function_input=EvaluateInput(generated_text=generated_text), + task_queue="llm", + start_to_close_timeout=timedelta(minutes=5) + ) + + return { + "generated_text": generated_text, + "evaluation": evaluation + } + except Exception as e: + log.error(f"ChildWorkflow failed {e}") + raise NonRetryableError(message=f"ChildWorkflow failed {e}") from e diff --git a/production_demo/src/workflows/workflow.py b/production_demo/src/workflows/workflow.py index 30a5199f..45e41e9a 100644 --- a/production_demo/src/workflows/workflow.py +++ b/production_demo/src/workflows/workflow.py @@ -1,7 +1,7 @@ import asyncio from datetime import timedelta from pydantic import BaseModel, Field -from restack_ai.workflow import workflow, log, workflow_info, import_functions +from restack_ai.workflow import workflow, log, workflow_info, import_functions, NonRetryableError from .child import ChildWorkflow, ChildWorkflowInput with import_functions(): @@ -14,34 +14,39 @@ class ExampleWorkflowInput(BaseModel): class ExampleWorkflow: @workflow.run async def run(self, input: ExampleWorkflowInput): - # use the parent run id to create child workflow ids - parent_workflow_id = workflow_info().workflow_id - - tasks = [] - for i in range(input.amount): - log.info(f"Queue ChildWorkflow {i+1} for execution") - task = workflow.child_execute( - workflow=ChildWorkflow, - workflow_id=f"{parent_workflow_id}-child-execute-{i+1}", - input=ChildWorkflowInput(name=f"child workflow {i+1}") + + try: + # use the parent run id to create child workflow ids + parent_workflow_id = workflow_info().workflow_id + + tasks = [] + for i in range(input.amount): + log.info(f"Queue ChildWorkflow {i+1} for execution") + task = workflow.child_execute( + workflow=ChildWorkflow, + workflow_id=f"{parent_workflow_id}-child-execute-{i+1}", + workflow_input=ChildWorkflowInput(prompt="Generate a random joke in max 20 words."), + ) + tasks.append(task) + + # Run all child workflows in parallel and wait for their results + results = await asyncio.gather(*tasks) + + for i, result in enumerate(results, start=1): + log.info(f"ChildWorkflow {i} completed", result=result) + + generated_text = await workflow.step( + function=llm_generate, + function_input=GenerateInput(prompt=f"Give me the top 3 unique jokes according to the results. {results}"), + task_queue="llm", + start_to_close_timeout=timedelta(minutes=2) ) - tasks.append(task) - # Run all child workflows in parallel and wait for their results - results = await asyncio.gather(*tasks) - - for i, result in enumerate(results, start=1): - log.info(f"ChildWorkflow {i} completed", result=result) - - generated_text = await workflow.step( - function=llm_generate, - function_input=GenerateInput(prompt=f"Give me the top 3 unique jokes according to the results. {results}"), - task_queue="llm", - start_to_close_timeout=timedelta(minutes=2) - ) - - return { - "top_jokes": generated_text, - "results": results - } + return { + "top_jokes": generated_text, + "results": results + } + except Exception as e: + log.error(f"ExampleWorkflow failed {e}") + raise NonRetryableError(message=f"ExampleWorkflow failed {e}") from e \ No newline at end of file