<img src="https://uploads-ssl.webflow.com/64b58e06654493327b4a59ee/6508a6973bb25aebe94d4fde_tilebox-color.svg" alt="Tilebox" width="200"/>

# Automations

This is a starter showcasing the Tilebox Workflows automations.

In [1]:
import os

import dotenv
from tilebox.workflows import Client as WorkflowsClient, ExecutionContext
from tilebox.workflows.cache import LocalFileSystemCache
from tilebox.workflows.automations import CronTask

Verify that all the environment variables are set and that the cluster exists. Clusters can be create on the [Tilebox Console](https://console.tilebox.com/workflows/clusters)

In [3]:
dotenv.load_dotenv()

if os.getenv("TILEBOX_API_KEY") is None:
    raise ValueError("TILEBOX_API_KEY environment variable is not set")
cluster = os.getenv("TILEBOX_CLUSTER")
if cluster is None:
    raise ValueError("TILEBOX_CLUSTER environment variable is not set")

In [4]:
workflowsClient = WorkflowsClient()

clusters = workflowsClient.clusters()
cluster = clusters.find(cluster)
if cluster is None:
    raise ValueError(f"Cluster {cluster} not found")

### Creating a service that's running on a schedule

Define a cron task by extending the `CronTask` class.

In [5]:
class MyCronTask(CronTask):
    message: str

    def execute(self, context: ExecutionContext) -> None:
        print(f"Hello {self.message} from a Cron Task!")
        # self.trigger is an attribute of the CronTask class,
        # which contains information about the trigger event
        # that caused this task to be submitted as part of a job
        print(f"This task was triggered at {self.trigger.time}")


Create an automation that runs every minute and prints "Hello World!"

In [6]:
automations = workflowsClient.automations()

In [7]:
# Set all the parameters for the task
task = MyCronTask(message="World")

cron_automation = automations.create_cron_automation(
    name="my-cron-automation",  # name of the cron automation
    task=task,  # the task (and its input parameters) to run repeatedly
    cron_schedules=[
        "* * * * *",  # run every minute
        # useful link for cron expressions: https://crontab.guru/
    ],
    cluster=cluster,  # cluster to submit jobs to
)

Start a workflow runner right here

In [None]:
runner = workflowsClient.runner(
    cluster,
    tasks=[ # which tasks can be executed by the runner
        MyCronTask,
    ],
    cache=LocalFileSystemCache(),
)

runner.run_forever()

Submit a cron task manually

In [12]:
jobs = workflowsClient.jobs()

# Set all the parameters for the task
task = MyCronTask(message="Hello")

# Submit a job with the task
jobs.submit("manual-cron-job", task.once(), cluster=cluster)

Job(id=UUID('0196da04-6553-9341-ae7d-ad35ac392055'), name='manual-cron-job', trace_parent='00-66bbf5847a28607d6923feaa42bb6a76-9468412114b35a34-01', state=<JobState.QUEUED: 1>, canceled=False)

In [13]:
runner.run_all()

Hello Hello from a Cron Task!
This task was triggered at 2025-05-16 16:54:03.295816+00:00


### Creating an on-demand, customer-triggered service

In [32]:
from shapely import MultiPolygon
from tilebox.datasets import Client as DatasetsClient
from tilebox.workflows import Task
from shapely.geometry import shape
import json

dc = DatasetsClient()

class OnDemandTask(Task):
    aoi: str
    start_date: str
    end_date: str

    def execute(self, context: ExecutionContext) -> None:

        area = shape(json.loads(self.aoi))

        client = dc
        datasets = client.datasets()
        sentinel2_msi = datasets.open_data.copernicus.sentinel2_msi
        data = sentinel2_msi.collection("S2A_S2MSI2A").query(
            temporal_extent=(self.start_date, self.end_date),
            spatial_extent=area,
            show_progress=True,
        )
        print("Average cloud cover:", data.cloud_cover.mean().values)

In [33]:
area = MultiPolygon(
    [
        (((11.26, 47.39), (11.12, 47.17), (11.37, 47.09), (11.69, 47.21), (11.64, 47.38), (11.26, 47.39)),),
    ]
)
jobs.submit("manual-on-demand-job", OnDemandTask(aoi=json.dumps(area.__geo_interface__), start_date="2025-04-01", end_date="2025-05-01"), cluster=cluster)

Job(id=UUID('0196da1e-e158-8e9e-74f5-48b442633e01'), name='manual-on-demand-job', trace_parent='00-362f628d4ae110f9480612c527273a11-30044f25f840516a-01', state=<JobState.QUEUED: 1>, canceled=False)

In [34]:
runner = workflowsClient.runner(
    cluster,
    tasks=[ # which tasks can be executed by the runner
        OnDemandTask,
    ],
    cache=LocalFileSystemCache(),
)

runner.run_all()

Average cloud cover: 53.10338208333334
