Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions api-reference/go/workflows/Jobs.Submit.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ icon: diagram-project
func (*JobClient) Submit(
ctx context.Context,
jobName string,
cluster *workflows.Cluster,
tasks []workflows.Task,
options ...job.SubmitOption
) (*workflows.Job, error)
Expand All @@ -21,9 +20,6 @@ Submit a job.
<ParamField path="jobName" type="string">
The name of the job
</ParamField>
<ParamField path="cluster" type="*Cluster">
The [cluster](/workflows/concepts/clusters#managing-clusters) to run the root task on
</ParamField>
<ParamField path="tasks" type="[]Task">
The root task for the job. This task is executed first and can submit subtasks to manage the entire workflow. A job can have optionally consist of multiple root tasks.
</ParamField>
Expand All @@ -36,6 +32,9 @@ Submit a job.
<ParamField path="WithMaxRetries(maxRetries int64)" default="0">
Set the maximum number of [retries](/workflows/concepts/tasks#retry-handling) for the subtask in case it fails
</ParamField>
<ParamField path="WithClusterSlug(clusterSlug string)" default="">
The [cluster](/workflows/concepts/clusters#managing-clusters) to run the root task on. If not provided, the default cluster is used.
</ParamField>

## Returns

Expand All @@ -45,7 +44,6 @@ A job object.
```go Go
job, err := client.Jobs.Submit(ctx,
"My job",
cluster,
[]workflows.Task{rootTask},
)
```
Expand Down
10 changes: 5 additions & 5 deletions api-reference/go/workflows/NewTaskRunner.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ icon: gear-code

```go
func (*Client) NewTaskRunner(
cluster *workflows.Cluster,
ctx context.Context,
options ...runner.Option,
) (*workflows.TaskRunner, error)
```
Expand All @@ -15,15 +15,15 @@ Initialize a task runner.

## Parameters

<ParamField path="cluster" type="*Cluster">
The [cluster](/workflows/concepts/clusters#managing-clusters) to connect to
</ParamField>
<ParamField path="options" type="[]runner.Option">
Options for initializing the task runner
</ParamField>

## Options

<ParamField path="WithClusterSlug(clusterSlug string)" default="">
The [cluster](/workflows/concepts/clusters#managing-clusters) to connect to. If not provided, the default cluster is used.
</ParamField>
<ParamField path="WithRunnerLogger(logger *slog.Logger)" default="slog.Default()">
Set the logger to use for the task runner
</ParamField>
Expand All @@ -37,6 +37,6 @@ The created task runner object.

<RequestExample>
```go Go
runner, err := client.NewTaskRunner()
runner, err := client.NewTaskRunner(ctx)
```
</RequestExample>
2 changes: 1 addition & 1 deletion api-reference/python/tilebox.workflows/Client.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ cluster_client = client.clusters()
automation_client = client.automations()

# or instantiate a task runner
runner = client.runner("dev-cluster", tasks=[...])
runner = client.runner(tasks=[...])
```
</RequestExample>
8 changes: 4 additions & 4 deletions api-reference/python/tilebox.workflows/Client.runner.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ icon: laptop-code

```python
def Client.runner(
cluster: ClusterSlugLike,
cluster: ClusterSlugLike | None = None,
tasks: list[type[Task]],
cache: JobCache | None = None
) -> TaskRunner
Expand All @@ -15,8 +15,9 @@ Initialize a task runner.

## Parameters

<ParamField path="cluster" type="str">
<ParamField path="cluster" type="str | None">
The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster associated with this task runner.
If not provided, the default cluster is used.
</ParamField>

<ParamField path="tasks" type="list[type[Task]]">
Expand All @@ -34,8 +35,7 @@ from tilebox.workflows.cache import LocalFileSystemCache

client = Client()
runner = client.runner(
"my-cluster-EdsdUozYprBJDL",
[MyFirstTask, MySubtask],
tasks=[MyFirstTask, MySubtask],
# optional:
cache=LocalFileSystemCache("cache_directory"),
)
Expand Down
7 changes: 3 additions & 4 deletions api-reference/python/tilebox.workflows/JobClient.submit.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ icon: diagram-project
def JobClient.submit(
job_name: str,
root_task_or_tasks: Task | Iterable[Task],
cluster: str | Cluster | Iterable[str | Cluster],
cluster: str | Cluster | Iterable[str | Cluster] | None = None,
max_retries: int = 0
) -> Job
```
Expand All @@ -24,8 +24,9 @@ Submit a job.
The root task for the job. This task is executed first and can submit subtasks to manage the entire workflow. A job can have optionally consist of multiple root tasks.
</ParamField>

<ParamField path="cluster" type="str">
<ParamField path="cluster" type="str | Cluster | Iterable[str | Cluster] | None">
The [cluster slug](/workflows/concepts/clusters#managing-clusters) for the cluster to run the root task on. In case of multiple root tasks, a list of cluster slugs can be provided.
If not provided, the default cluster is used.
</ParamField>

<ParamField path="max_retries" type="int">
Expand All @@ -43,8 +44,6 @@ job = job_client.submit(
value=42,
data={"key": "value"}
),
"my-cluster-EdsdUozYprBJDL",
max_retries=0,
)
```
</RequestExample>
24 changes: 6 additions & 18 deletions guides/workflows/multi-language.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,15 @@ Write a simple HTTP tasking server in Go with a `/submit` endpoint that accepts

```go Go
func main() {
ctx := context.Background()
client := workflows.NewClient()

cluster, err := client.Clusters.Get(ctx, "test-cluster-tZD9Ca2qsqt4V")
if err != nil {
log.Fatal(err)
}

http.HandleFunc("/submit", submitHandler(client, cluster))

log.Println("Server starting on http://localhost:8080")

client := workflows.NewClient()
http.HandleFunc("/submit", submitHandler(client))
log.Fatal(http.ListenAndServe(":8080", nil))
}

// Submit a job based on some query parameters
func submitHandler(client *workflows.Client, cluster *workflows.Cluster) http.HandlerFunc {
func submitHandler(client *workflows.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
latArg := r.URL.Query().Get("lat")
lonArg := r.URL.Query().Get("lon")
Expand Down Expand Up @@ -145,7 +138,7 @@ func submitHandler(client *workflows.Client, cluster *workflows.Cluster) http.Ha
spectralBands = append(spectralBands, band)
}

job, err := client.Jobs.Submit(r.Context(), "Schedule Image capture", cluster,
job, err := client.Jobs.Submit(r.Context(), "Schedule Image capture",
[]workflows.Task{
&ScheduleImageCapture{
Location: [2]float64{latFloat, lonFloat},
Expand Down Expand Up @@ -177,12 +170,7 @@ from tilebox.workflows import Client

def main():
client = Client()
runner = client.runner(
"test-cluster-tZD9Ca2qsqt4V",
tasks=[
ScheduleImageCapture,
],
)
runner = client.runner(tasks=[ScheduleImageCapture])
runner.run_forever()

if __name__ == "__main__":
Expand Down
38 changes: 7 additions & 31 deletions quickstart.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,6 @@ If you prefer to work locally, follow these steps to get started.

<Tip>Copy the API key and keep it somewhere safe. You will need it to authenticate your requests.</Tip>
</Step>
<Step title="Create a Cluster">
Create a cluster by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Workflows -> Clusters](https://console.tilebox.com/workflows/clusters), and clicking the "Create cluster" button.

<Frame>
<img src="/assets/console/cluster-light.png" alt="Tilebox Console" className="dark:hidden" />
<img src="/assets/console/cluster-dark.png" alt="Tilebox Console" className="hidden dark:block" />
</Frame>

<Tip>Copy the cluster slug, you will need it to run your workflows.</Tip>
</Step>
<Step title="Query Data">
Use the datasets client to query data from a dataset.

Expand All @@ -82,8 +72,7 @@ If you prefer to work locally, follow these steps to get started.
```python Python
from tilebox.workflows import Client, Task

# Replace with your actual cluster and token
cluster = "YOUR_COMPUTE_CLUSTER"
# Replace with your actual token
client = Client(token="YOUR_TILEBOX_API_KEY")

class HelloWorldTask(Task):
Expand All @@ -102,10 +91,10 @@ If you prefer to work locally, follow these steps to get started.

# Initiate the job
jobs = client.jobs()
jobs.submit("parameterized-hello-world", HelloWorldTask(greeting="Greetings", name="Universe"), cluster)
jobs.submit("parameterized-hello-world", HelloWorldTask(greeting="Greetings", name="Universe"))

# Run the tasks
runner = client.runner(cluster, tasks=[HelloWorldTask, HelloSubtask])
runner = client.runner(tasks=[HelloWorldTask, HelloSubtask])
runner.run_all()
```
</Step>
Expand Down Expand Up @@ -158,16 +147,6 @@ If you prefer to work locally, follow these steps to get started.

<Tip>Copy the API key and keep it somewhere safe. You will need it to authenticate your requests.</Tip>
</Step>
<Step title="Create a Cluster">
Create a cluster by logging into the [Tilebox Console](https://console.tilebox.com), navigating to [Workflows -> Clusters](https://console.tilebox.com/workflows/clusters), and clicking the "Create cluster" button.

<Frame>
<img src="/assets/console/cluster-light.png" alt="Tilebox Console" className="dark:hidden" />
<img src="/assets/console/cluster-dark.png" alt="Tilebox Console" className="hidden dark:block" />
</Frame>

<Tip>Copy the cluster slug, you will need it to run your workflows.</Tip>
</Step>
<Step title="Query Data">
Run [tilebox-generate](https://github.com/tilebox/tilebox-generate) in the root directory of your Go project.
It generates the dataset type for Sentinel-2 MSI dataset. It will generate a `./protogen/tilebox/v1/sentinel2_msi.pb.go` file.
Expand Down Expand Up @@ -285,11 +264,10 @@ If you prefer to work locally, follow these steps to get started.
func main() {
ctx := context.Background()

// Replace with your actual cluster and token
clusterSlug := "YOUR_COMPUTE_CLUSTER"
// Replace with your actual token
client := workflows.NewClient()

job, err := client.Jobs.Submit(ctx, "hello-world", clusterSlug,
job, err := client.Jobs.Submit(ctx, "hello-world",
[]workflows.Task{
&HelloTask{
Greeting: "Greetings",
Expand All @@ -304,9 +282,7 @@ If you prefer to work locally, follow these steps to get started.

slog.InfoContext(ctx, "Job submitted", slog.String("job_id", job.ID.String()))

runner, err := client.NewTaskRunner(
workflows.WithCluster(clusterSlug),
)
runner, err := client.NewTaskRunner(ctx)
if err != nil {
slog.Error("failed to create task runner", slog.Any("error", err))
return
Expand All @@ -321,7 +297,7 @@ If you prefer to work locally, follow these steps to get started.
return
}

runner.Run(context.Background())
runner.Run(ctx)
}
```
</Step>
Expand Down
11 changes: 2 additions & 9 deletions workflows/caches.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ You can configure a cache while creating a task runner by passing a cache instan

client = Client()
runner = client.runner(
"dev-cluster",
tasks=[...],
cache=InMemoryCache(),
)
Expand Down Expand Up @@ -63,7 +62,6 @@ bucket = storage_client.bucket("cache-bucket")

client = Client()
runner = client.runner(
"dev-cluster",
tasks=[...],
cache=GoogleStorageCache(bucket, prefix="jobs"),
)
Expand All @@ -87,7 +85,6 @@ from tilebox.workflows.cache import AmazonS3Cache

client = Client()
runner = client.runner(
"dev-cluster",
tasks=[...],
cache=AmazonS3Cache("my-bucket-name", prefix="jobs")
)
Expand All @@ -109,7 +106,6 @@ from tilebox.workflows.cache import LocalFileSystemCache

client = Client()
runner = client.runner(
"dev-cluster",
tasks=[...],
cache=LocalFileSystemCache("/path/to/cache/directory"),
)
Expand All @@ -131,7 +127,6 @@ from tilebox.workflows.cache import InMemoryCache

client = Client()
runner = client.runner(
"dev-cluster",
tasks=[...],
cache=InMemoryCache(),
)
Expand Down Expand Up @@ -179,11 +174,10 @@ To test the workflow, you can start a local task runner using the `InMemoryCache
```python Python
# submit a job to test our workflow
job_client = client.jobs()
job_client.submit("testing-cache-access", ProducerTask(), cluster="dev-cluster")
job_client.submit("testing-cache-access", ProducerTask())

# start a runner to execute it
runner = client.runner(
"dev-cluster",
tasks=[ProducerTask, ConsumerTask],
cache=LocalFileSystemCache("/path/to/cache/directory"),
)
Expand Down Expand Up @@ -260,11 +254,10 @@ Submitting a job of the `CacheGroupDemo` and running it with a task runner can b
```python Python
# submit a job to test our workflow
job_client = client.jobs()
job_client.submit("cache-groups", CacheGroupDemo(5), cluster="dev-cluster")
job_client.submit("cache-groups", CacheGroupDemo(5))

# start a runner to execute it
runner = client.runner(
"dev-cluster",
tasks=[CacheGroupDemo, ProduceRandomNumbers, ProduceRandomNumber, PrintSum],
cache=LocalFileSystemCache("/path/to/cache/directory"),
)
Expand Down
8 changes: 7 additions & 1 deletion workflows/concepts/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ If multiple task runners have the same set of registered tasks, you can assign t
You can add task runners to a cluster by specifying the [cluster's slug](#cluster-slug) when [registering a task runner](/workflows/concepts/task-runners).
Each task runner must always be assigned to a cluster.

## Default Cluster

Each team has a default cluster that is automatically created for them.
This cluster is used when no cluster is specified when [registering a task runner](/workflows/concepts/task-runners) or [submitting a job](/workflows/concepts/jobs).
This is useful when you are just getting started and don't need to create any custom clusters yet.

## Managing Clusters

Before registering a task runner or submitting a job, you must create a cluster. You can also list, fetch, and delete clusters as needed. The following sections explain how to do this.
Expand Down Expand Up @@ -231,10 +237,10 @@ func main() {
_, _ = client.Jobs.Submit(
ctx,
"my-job",
"testing-CvufcSxcC9SKfe",
[]workflows.Task{
&MultiCluster{},
},
job.WithClusterSlug("testing-CvufcSxcC9SKfe"),
)
}
```
Expand Down
Loading