Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 186 additions & 72 deletions workflows/progress.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,59 @@ Progress indicators in Tilebox use a `done` / `total` model. Tasks can increase

Progress tracking is always done at a task level. Each task can report its progress updates, as increases in `done` and `total` independently, and the job's total progress is the sum of all tasks' progress.

<Info>
Progress tracking is currently only available in the Tilebox Python SDK. Go support is coming soon.
</Info>

<CodeGroup>
```python Python
from tilebox.workflows import Task, ExecutionContext

class MyTask(Task):
class ProgressRootTask(Task):
n: int

def execute(self, context: ExecutionContext) -> None:
# report that 10 units of work need to be done
context.progress().add(10)
context.progress().add(self.n) # [!code ++]

for _ in range(10):
context.submit_subtask(MySubTask())
for _ in range(self.n):
context.submit_subtask(ProgressSubTask())

class MySubTask(Task):
class ProgressSubTask(Task):
def execute(self, context: ExecutionContext) -> None:
# report that one unit of work has been completed
context.progress().done(1)
context.progress().done(1) # [!code ++]
```

```go Go
type ProgressRootTask struct {
N int
}

func (t *ProgressRootTask) Execute(ctx context.Context) error {
// report that 10 units of work need to be done
err := workflows.DefaultProgress().Add(ctx, uint64(t.N)) // [!code ++]
if err != nil {
return err
}

for range t.N {
_, err := workflows.SubmitSubtask(ctx, &ProgressSubTask{})
if err != nil {
return err
}
}

return nil
}

type ProgressSubTask struct{}

func (t *ProgressSubTask) Execute(ctx context.Context) error {
// report that one unit of work has been completed
err := workflows.DefaultProgress().Done(ctx, 1) // [!code ++]
if err != nil {
return err
}

return nil
}
```
</CodeGroup>

Expand All @@ -47,93 +80,174 @@ class MySubTask(Task):
A job can have multiple independent progress indicators. This is useful when a job consists of multiple steps, that each benefits from having its own progress indicator.
To create a new progress indicator, call `context.progress(name)` with a unique `name` for the indicator.

<CodeGroup>
```python Python lines focus={14-15,35,51}
from io import BytesIO

import httpx # pip install httpx
from PIL import Image # pip install pillow
from tilebox.workflows import Task, ExecutionContext

class DownloadImages(Task):
image_urls: list[str]
<CodeGroup>
```python Python lines focus={5-6,15-18,27-28,33-34,38-39}
class MultiProgressWorkflowRoot(Task):
n: int

def execute(self, context: ExecutionContext) -> None:
# download and process images from a list of URLs
n = len(self.image_urls)
# initialize a progress indicator for the finalize task
context.progress("finalize").add(1)

context.progress("download").add(n)
context.progress("process").add(n)
process = context.submit_subtask(Process(self.n))
context.submit_subtask(Cleanup(), depends_on=[process])

for i, url in enumerate(self.image_urls):
image_name = f"image_{i:04d}.png"
grayscale_name = f"image_gray_{i:04d}.png"
download = context.submit_subtask(DownloadImage(url, image_name))
process = context.submit_subtask(
ToGrayscale(image_name, grayscale_name),
depends_on=[download],
)
class Process(Task):
n: int

class DownloadImage(Task):
url: str
image_name: str
def execute(self, context: ExecutionContext) -> None:
# initialize two progress indicators for the two steps,
# with a total work of n for each
context.progress("step1").add(self.n)
context.progress("step2").add(self.n)

# now submit N subtasks for the two steps
for _ in range(self.n):
step1 = context.submit_subtask(Step1())
context.submit_subtask(Step2(), depends_on=[step1])

class Step1(Task):
def execute(self, context: ExecutionContext) -> None:
response = httpx.get(self.url, follow_redirects=True)
context.job_cache[self.image_name] = response.read()

# report that one image has been downloaded
context.progress("download").done(1)
# 1 unit of work for step1 is done
context.progress("step1").done(1)


class ToGrayscale(Task):
input_image: str
output_name: str
class Step2(Task):
def execute(self, context: ExecutionContext) -> None:
# 1 unit of work for step2 is done
context.progress("step2").done(1)

class Finalize(Task):
def execute(self, context: ExecutionContext) -> None:
image = Image.open(BytesIO(context.job_cache[self.input_image]))
image = image.convert("L") # convert the image to grayscale

buffer = BytesIO()
image.save(buffer, format="png")

context.job_cache[self.output_name] = buffer.getvalue()

context.progress("process").done(1)
# finalize is done
context.progress("finalize").done(1)
```
```go Go lines expandable focus={7,32,37,62,74,86}
type MultiProgressWorkflowRoot struct {
N int
}

func (t *MultiProgressWorkflowRoot) Execute(ctx context.Context) error {
// initialize a progress indicator for the finalize task
err := workflows.Progress("finalize").Add(ctx, 1)
if err != nil {
return err
}

process, err := workflows.SubmitSubtask(ctx, &Process{N: t.N})
if err != nil {
return err
}

_, err = workflows.SubmitSubtask(ctx, &Cleanup{}, subtask.WithDependencies(process))
if err != nil {
return err
}

return nil
}

type Process struct {
N int
}

func (t *Process) Execute(ctx context.Context) error {
// initialize two progress indicators for the two steps,
// with a total work of n for each
err := workflows.Progress("step1").Add(ctx, uint64(t.N))
if err != nil {
return err
}

err = workflows.Progress("step2").Add(ctx, uint64(t.N))
if err != nil {
return err
}

// now submit N subtasks for the two steps
for range t.N {
step1, err := workflows.SubmitSubtask(ctx, &Step1{})
if err != nil {
return err
}

_, err = workflows.SubmitSubtask(ctx, &Step2{}, subtask.WithDependencies(step1))
if err != nil {
return err
}
}

return nil
}

type Step1 struct{}

func (t *Step1) Execute(ctx context.Context) error {
// 1 unit of work for step1 is done
err := workflows.Progress("step1").Done(ctx, 1)
if err != nil {
return err
}

return nil
}

type Step2 struct{}

func (t *Step2) Execute(ctx context.Context) error {
// 1 unit of work for step2 is done
err := workflows.Progress("step2").Done(ctx, 1)
if err != nil {
return err
}

return nil
}

type Cleanup struct{}

func (t *Cleanup) Execute(ctx context.Context) error {
// finalize is done
err := workflows.Progress("finalize").Done(ctx, 1)
if err != nil {
return err
}

return nil
}
```

</CodeGroup>

## Querying Progress

At any time during a job's execution, you can query the current progress of a job using the `find` method on the job client. The returned job object contains a `progress` field that contains the current progress of the job.

<CodeGroup>
```python
from tilebox.workflows import Client

job_client = Client().jobs()
job = job_client.submit("download-images", DownloadImages(
[
"https://picsum.photos/id/123/500/500",
"https://picsum.photos/id/155/500/500",
],
))

job = job_client.find(job.id) # refresh the job object
print(job)
```python Python
job = job_client.find(job_id)
for indicator in job.progress:
print(f"{indicator.label}: {indicator.done}/{indicator.total}")
```
```go Go
job, err := client.Jobs.Get(ctx, jobID)
if err != nil {
slog.Error("Failed to get job", slog.Any("error", err))
return
}

for _, indicator := range job.Progress {
fmt.Printf("%s: %d/%d\n", indicator.Label, indicator.Done, indicator.Total)
}
```
</CodeGroup>

```plaintext Output
Job(
id=UUID('019952b8-a5dc-f4c0-e428-724ccc587d83'),
name='download-images',
...,
progress=[
ProgressIndicator(label='download', total=2, done=1),
ProgressIndicator(label='process', total=2, done=0),
]
)
finalize: 1/1
step1: 4/4
step2: 4/4
```

### Progress Display in interactive environments
Expand Down