diff --git a/workflows/progress.mdx b/workflows/progress.mdx
index cfb0a60..1b60277 100644
--- a/workflows/progress.mdx
+++ b/workflows/progress.mdx
@@ -19,26 +19,59 @@ Progress indicators in Tilebox use a `done` / `total` model. Tasks can increase
Progress tracking is always done at a task level. Each task can report its progress updates, as increases in `done` and `total` independently, and the job's total progress is the sum of all tasks' progress.
-
- Progress tracking is currently only available in the Tilebox Python SDK. Go support is coming soon.
-
-
```python Python
from tilebox.workflows import Task, ExecutionContext
-class MyTask(Task):
+class ProgressRootTask(Task):
+ n: int
+
def execute(self, context: ExecutionContext) -> None:
# report that 10 units of work need to be done
- context.progress().add(10)
+ context.progress().add(self.n) # [!code ++]
- for _ in range(10):
- context.submit_subtask(MySubTask())
+ for _ in range(self.n):
+ context.submit_subtask(ProgressSubTask())
-class MySubTask(Task):
+class ProgressSubTask(Task):
def execute(self, context: ExecutionContext) -> None:
# report that one unit of work has been completed
- context.progress().done(1)
+ context.progress().done(1) # [!code ++]
+```
+
+```go Go
+type ProgressRootTask struct {
+ N int
+}
+
+func (t *ProgressRootTask) Execute(ctx context.Context) error {
+ // report that 10 units of work need to be done
+ err := workflows.DefaultProgress().Add(ctx, uint64(t.N)) // [!code ++]
+ if err != nil {
+ return err
+ }
+
+ for range t.N {
+ _, err := workflows.SubmitSubtask(ctx, &ProgressSubTask{})
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+type ProgressSubTask struct{}
+
+func (t *ProgressSubTask) Execute(ctx context.Context) error {
+ // report that one unit of work has been completed
+ err := workflows.DefaultProgress().Done(ctx, 1) // [!code ++]
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
```
@@ -47,60 +80,145 @@ class MySubTask(Task):
A job can have multiple independent progress indicators. This is useful when a job consists of multiple steps, that each benefits from having its own progress indicator.
To create a new progress indicator, call `context.progress(name)` with a unique `name` for the indicator.
-
-```python Python lines focus={14-15,35,51}
-from io import BytesIO
-import httpx # pip install httpx
-from PIL import Image # pip install pillow
-from tilebox.workflows import Task, ExecutionContext
-class DownloadImages(Task):
- image_urls: list[str]
+
+```python Python lines focus={5-6,15-18,27-28,33-34,38-39}
+class MultiProgressWorkflowRoot(Task):
+ n: int
def execute(self, context: ExecutionContext) -> None:
- # download and process images from a list of URLs
- n = len(self.image_urls)
+ # initialize a progress indicator for the finalize task
+ context.progress("finalize").add(1)
- context.progress("download").add(n)
- context.progress("process").add(n)
+ process = context.submit_subtask(Process(self.n))
+ context.submit_subtask(Cleanup(), depends_on=[process])
- for i, url in enumerate(self.image_urls):
- image_name = f"image_{i:04d}.png"
- grayscale_name = f"image_gray_{i:04d}.png"
- download = context.submit_subtask(DownloadImage(url, image_name))
- process = context.submit_subtask(
- ToGrayscale(image_name, grayscale_name),
- depends_on=[download],
- )
+class Process(Task):
+ n: int
-class DownloadImage(Task):
- url: str
- image_name: str
+ def execute(self, context: ExecutionContext) -> None:
+ # initialize two progress indicators for the two steps,
+ # with a total work of n for each
+ context.progress("step1").add(self.n)
+ context.progress("step2").add(self.n)
+
+ # now submit N subtasks for the two steps
+ for _ in range(self.n):
+ step1 = context.submit_subtask(Step1())
+ context.submit_subtask(Step2(), depends_on=[step1])
+class Step1(Task):
def execute(self, context: ExecutionContext) -> None:
- response = httpx.get(self.url, follow_redirects=True)
- context.job_cache[self.image_name] = response.read()
-
- # report that one image has been downloaded
- context.progress("download").done(1)
+ # 1 unit of work for step1 is done
+ context.progress("step1").done(1)
-class ToGrayscale(Task):
- input_image: str
- output_name: str
+class Step2(Task):
+ def execute(self, context: ExecutionContext) -> None:
+ # 1 unit of work for step2 is done
+ context.progress("step2").done(1)
+class Finalize(Task):
def execute(self, context: ExecutionContext) -> None:
- image = Image.open(BytesIO(context.job_cache[self.input_image]))
- image = image.convert("L") # convert the image to grayscale
-
- buffer = BytesIO()
- image.save(buffer, format="png")
-
- context.job_cache[self.output_name] = buffer.getvalue()
-
- context.progress("process").done(1)
+ # finalize is done
+ context.progress("finalize").done(1)
+```
+```go Go lines expandable focus={7,32,37,62,74,86}
+type MultiProgressWorkflowRoot struct {
+ N int
+}
+
+func (t *MultiProgressWorkflowRoot) Execute(ctx context.Context) error {
+ // initialize a progress indicator for the finalize task
+ err := workflows.Progress("finalize").Add(ctx, 1)
+ if err != nil {
+ return err
+ }
+
+ process, err := workflows.SubmitSubtask(ctx, &Process{N: t.N})
+ if err != nil {
+ return err
+ }
+
+ _, err = workflows.SubmitSubtask(ctx, &Cleanup{}, subtask.WithDependencies(process))
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+type Process struct {
+ N int
+}
+
+func (t *Process) Execute(ctx context.Context) error {
+ // initialize two progress indicators for the two steps,
+ // with a total work of n for each
+ err := workflows.Progress("step1").Add(ctx, uint64(t.N))
+ if err != nil {
+ return err
+ }
+
+ err = workflows.Progress("step2").Add(ctx, uint64(t.N))
+ if err != nil {
+ return err
+ }
+
+ // now submit N subtasks for the two steps
+ for range t.N {
+ step1, err := workflows.SubmitSubtask(ctx, &Step1{})
+ if err != nil {
+ return err
+ }
+
+ _, err = workflows.SubmitSubtask(ctx, &Step2{}, subtask.WithDependencies(step1))
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+type Step1 struct{}
+
+func (t *Step1) Execute(ctx context.Context) error {
+ // 1 unit of work for step1 is done
+ err := workflows.Progress("step1").Done(ctx, 1)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+type Step2 struct{}
+
+func (t *Step2) Execute(ctx context.Context) error {
+ // 1 unit of work for step2 is done
+ err := workflows.Progress("step2").Done(ctx, 1)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+type Cleanup struct{}
+
+func (t *Cleanup) Execute(ctx context.Context) error {
+ // finalize is done
+ err := workflows.Progress("finalize").Done(ctx, 1)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
```
+
## Querying Progress
@@ -108,32 +226,28 @@ class ToGrayscale(Task):
At any time during a job's execution, you can query the current progress of a job using the `find` method on the job client. The returned job object contains a `progress` field that contains the current progress of the job.
-```python
-from tilebox.workflows import Client
-
-job_client = Client().jobs()
-job = job_client.submit("download-images", DownloadImages(
- [
- "https://picsum.photos/id/123/500/500",
- "https://picsum.photos/id/155/500/500",
- ],
-))
-
-job = job_client.find(job.id) # refresh the job object
-print(job)
+```python Python
+job = job_client.find(job_id)
+for indicator in job.progress:
+ print(f"{indicator.label}: {indicator.done}/{indicator.total}")
+```
+```go Go
+job, err := client.Jobs.Get(ctx, jobID)
+if err != nil {
+ slog.Error("Failed to get job", slog.Any("error", err))
+ return
+}
+
+for _, indicator := range job.Progress {
+ fmt.Printf("%s: %d/%d\n", indicator.Label, indicator.Done, indicator.Total)
+}
```
```plaintext Output
-Job(
- id=UUID('019952b8-a5dc-f4c0-e428-724ccc587d83'),
- name='download-images',
- ...,
- progress=[
- ProgressIndicator(label='download', total=2, done=1),
- ProgressIndicator(label='process', total=2, done=0),
- ]
-)
+finalize: 1/1
+step1: 4/4
+step2: 4/4
```
### Progress Display in interactive environments