<img src="https://storage.googleapis.com/tbx-web-assets-2bad228/logos/tilebox-horizontal-color.svg" alt="Tilebox" width="200"/>

# Workflow Orchestrator

This is a demo showcasing a minimal workflow orchestrator using the [Tilebox](https://tilebox.com) SDKs.

## Prerequisites

✅ Fill the .env file with your API key and cluster slug, as described in the [README](README.md).  
✅ Install the required dependencies with `uv sync`.

In [1]:
from tilebox.workflows import Client, ExecutionContext, Task
from tilebox.workflows.interceptors import print_executions

---

## Your first Task

A task is a class derived from `tilebox.workflows.Task`, which then has to implement an `execute` method, which will be called when the task is run.

Deriving from `Task` also makes the class a `dataclass`, which is why we can specify the task arguments just as we do for `dataclasses`.
This is the mechanism that allows us to serialize and deserialize tasks, and therefore transmit them between different parallel task runners.

In [2]:
class MyFirstTask(Task):
    greeting: str

    def execute(self, context: ExecutionContext) -> None:
        print(f"{self.greeting} from my first task!")

### Running as Job

#### Submitting a job

In order to run a Task, we need to submit it as part of a job:

📒 Jobs are submitted to logical clusters, like the one you created at [https://console.tilebox.com/workflows/clusters](https://console.tilebox.com/workflows/clusters) and saved to the .env file.

In [3]:
import dotenv
dotenv.load_dotenv()

In [4]:
import os
cluster = os.environ['TILEBOX_CLUSTER']

client = Client()  # a workflow client for https://api.tilebox.com
job_client = client.jobs()

In [5]:
job = job_client.submit("my-first-job", MyFirstTask("Hello Tilebox"), cluster=cluster)

#### Actually running it

However, so far nothing has happened. This is because while we have submitted the Task, there is no `Task Runner` capable of executing tasks of type `MyFirstTask`.
But we can change that, by instantiating a runner and letting it run until no more tasks are available:

In [6]:
runner = client.runner(cluster, tasks=[MyFirstTask])  # this runner can run tasks of type MyFirstTask

In [7]:
runner.run_all()  # query for runnable tasks, run them, then return

Hello Tilebox from my first task!


---

## Workflows with Subtasks

So far our `MyFirstTask` has only be mildly interesting. Let's make it a bit more interesting by adding **subtasks**. So let's implement another workflow, one capable of downloading a list of images. We'll call it `DownloadImages`. 

We'll also
- configure [logging](https://docs.tilebox.com/workflows/observability/logging#logging)
- [visualize](https://docs.tilebox.com/workflows/concepts/jobs#visualization) the dynamically generated DAG
- use a [Tilebox Cache](https://docs.tilebox.com/workflows/caches#caches) to store the downloaded images

In [16]:
import logging
import httpx

from tilebox.workflows.observability.logging import configure_console_logging, get_logger

configure_console_logging()
logger = get_logger(level=logging.INFO)

class DownloadImages(Task):
    images: dict[str, str]

    def execute(self, context: ExecutionContext) -> None:
        """Download a list of images and save them to a cache"""

        logger.info("Starting an image download workflow.", extra={"n_images": len(self.images)})

        for image_name, url in self.images.items():
            # instead of calling the download function now here directly
            # we instead schedule it as a subtask like this:
            context.submit_subtask(DownloadImage(image_name, url))

        # this is how the task will be displayed in visualizations:
        context.current_task.display = f"DownloadImages(n={len(self.images)})"


class DownloadImage(Task):
    name: str
    url: str

    def execute(self, context: ExecutionContext) -> None:
        """Download an image and save it to a cache"""

        logger.info("Downloading an image.", extra={"image_name": self.name, "url": self.url})
        img = httpx.get(self.url, follow_redirects=True).content
        context.job_cache[self.name] = img

        context.current_task.display = f"DownloadImage('{self.name}')"

### Let's start a download job

In [17]:
images = {
    "man_with_camera.jpg": "https://picsum.photos/id/91/1200/760",
    "library.jpg": "https://picsum.photos/id/444/1200/760",
    "office_desk.jpg": "https://picsum.photos/id/445/1200/760",
}

In [18]:
job = job_client.submit("download-images", DownloadImages(images), cluster=cluster)

and run it

In [19]:

from tilebox.workflows.cache import LocalFileSystemCache

runner = client.runner(
    cluster, tasks=[DownloadImages, DownloadImage],
    cache=LocalFileSystemCache()
)  # this runner can run tasks of type DownloadImages and DownloadImage
runner.add_interceptor(print_executions)
runner.run_all()

Executing DownloadImages(images={'man_with_camera.jpg': 'https://picsum.photos/id/91/1200/760', 'library.jpg': 'https://picsum.photos/id/444/1200/760', 'office_desk.jpg': 'https://picsum.photos/id/445/1200/760'})
[38;5;241m2025-03-13 14:54:59,100[0m[92m INFO [0mStarting an image download workflow.
Executing DownloadImage(name='man_with_camera.jpg', url='https://picsum.photos/id/91/1200/760')
[38;5;241m2025-03-13 14:55:00,034[0m[92m INFO [0mDownloading an image.
Executing DownloadImage(name='library.jpg', url='https://picsum.photos/id/444/1200/760')
[38;5;241m2025-03-13 14:55:00,589[0m[92m INFO [0mDownloading an image.
Executing DownloadImage(name='office_desk.jpg', url='https://picsum.photos/id/445/1200/760')
[38;5;241m2025-03-13 14:55:00,979[0m[92m INFO [0mDownloading an image.


### Job visualization

In [22]:
job_client.display(job)

We see that DownloadImages has three sub-tasks without dependencies between each other. Green nodes are tasks that have been executed successfully, light red nodes are scheduled.

For more advanced examples take a look at sibling folders of this one, advanced topics include

- Task dependencies
- Automatic parallelization and load-balancing
- Task and job retries
- Automations – jobs that get triggered by events
- Multi-cluster workflows
- Multi-language workflows