From 8375727e012f9c9165c7e9d1ab99d227b154d1cd Mon Sep 17 00:00:00 2001 From: Lukas Bindreiter Date: Fri, 19 Sep 2025 14:57:34 +0200 Subject: [PATCH] Dcoument progress tracking in go --- workflows/progress.mdx | 258 +++++++++++++++++++++++++++++------------ 1 file changed, 186 insertions(+), 72 deletions(-) diff --git a/workflows/progress.mdx b/workflows/progress.mdx index cfb0a60..1b60277 100644 --- a/workflows/progress.mdx +++ b/workflows/progress.mdx @@ -19,26 +19,59 @@ Progress indicators in Tilebox use a `done` / `total` model. Tasks can increase Progress tracking is always done at a task level. Each task can report its progress updates, as increases in `done` and `total` independently, and the job's total progress is the sum of all tasks' progress. - - Progress tracking is currently only available in the Tilebox Python SDK. Go support is coming soon. - - ```python Python from tilebox.workflows import Task, ExecutionContext -class MyTask(Task): +class ProgressRootTask(Task): + n: int + def execute(self, context: ExecutionContext) -> None: # report that 10 units of work need to be done - context.progress().add(10) + context.progress().add(self.n) # [!code ++] - for _ in range(10): - context.submit_subtask(MySubTask()) + for _ in range(self.n): + context.submit_subtask(ProgressSubTask()) -class MySubTask(Task): +class ProgressSubTask(Task): def execute(self, context: ExecutionContext) -> None: # report that one unit of work has been completed - context.progress().done(1) + context.progress().done(1) # [!code ++] +``` + +```go Go +type ProgressRootTask struct { + N int +} + +func (t *ProgressRootTask) Execute(ctx context.Context) error { + // report that 10 units of work need to be done + err := workflows.DefaultProgress().Add(ctx, uint64(t.N)) // [!code ++] + if err != nil { + return err + } + + for range t.N { + _, err := workflows.SubmitSubtask(ctx, &ProgressSubTask{}) + if err != nil { + return err + } + } + + return nil +} + +type ProgressSubTask struct{} + +func (t *ProgressSubTask) Execute(ctx context.Context) error { + // report that one unit of work has been completed + err := workflows.DefaultProgress().Done(ctx, 1) // [!code ++] + if err != nil { + return err + } + + return nil +} ``` @@ -47,60 +80,145 @@ class MySubTask(Task): A job can have multiple independent progress indicators. This is useful when a job consists of multiple steps, that each benefits from having its own progress indicator. To create a new progress indicator, call `context.progress(name)` with a unique `name` for the indicator. - -```python Python lines focus={14-15,35,51} -from io import BytesIO -import httpx # pip install httpx -from PIL import Image # pip install pillow -from tilebox.workflows import Task, ExecutionContext -class DownloadImages(Task): - image_urls: list[str] + +```python Python lines focus={5-6,15-18,27-28,33-34,38-39} +class MultiProgressWorkflowRoot(Task): + n: int def execute(self, context: ExecutionContext) -> None: - # download and process images from a list of URLs - n = len(self.image_urls) + # initialize a progress indicator for the finalize task + context.progress("finalize").add(1) - context.progress("download").add(n) - context.progress("process").add(n) + process = context.submit_subtask(Process(self.n)) + context.submit_subtask(Cleanup(), depends_on=[process]) - for i, url in enumerate(self.image_urls): - image_name = f"image_{i:04d}.png" - grayscale_name = f"image_gray_{i:04d}.png" - download = context.submit_subtask(DownloadImage(url, image_name)) - process = context.submit_subtask( - ToGrayscale(image_name, grayscale_name), - depends_on=[download], - ) +class Process(Task): + n: int -class DownloadImage(Task): - url: str - image_name: str + def execute(self, context: ExecutionContext) -> None: + # initialize two progress indicators for the two steps, + # with a total work of n for each + context.progress("step1").add(self.n) + context.progress("step2").add(self.n) + + # now submit N subtasks for the two steps + for _ in range(self.n): + step1 = context.submit_subtask(Step1()) + context.submit_subtask(Step2(), depends_on=[step1]) +class Step1(Task): def execute(self, context: ExecutionContext) -> None: - response = httpx.get(self.url, follow_redirects=True) - context.job_cache[self.image_name] = response.read() - - # report that one image has been downloaded - context.progress("download").done(1) + # 1 unit of work for step1 is done + context.progress("step1").done(1) -class ToGrayscale(Task): - input_image: str - output_name: str +class Step2(Task): + def execute(self, context: ExecutionContext) -> None: + # 1 unit of work for step2 is done + context.progress("step2").done(1) +class Finalize(Task): def execute(self, context: ExecutionContext) -> None: - image = Image.open(BytesIO(context.job_cache[self.input_image])) - image = image.convert("L") # convert the image to grayscale - - buffer = BytesIO() - image.save(buffer, format="png") - - context.job_cache[self.output_name] = buffer.getvalue() - - context.progress("process").done(1) + # finalize is done + context.progress("finalize").done(1) +``` +```go Go lines expandable focus={7,32,37,62,74,86} +type MultiProgressWorkflowRoot struct { + N int +} + +func (t *MultiProgressWorkflowRoot) Execute(ctx context.Context) error { + // initialize a progress indicator for the finalize task + err := workflows.Progress("finalize").Add(ctx, 1) + if err != nil { + return err + } + + process, err := workflows.SubmitSubtask(ctx, &Process{N: t.N}) + if err != nil { + return err + } + + _, err = workflows.SubmitSubtask(ctx, &Cleanup{}, subtask.WithDependencies(process)) + if err != nil { + return err + } + + return nil +} + +type Process struct { + N int +} + +func (t *Process) Execute(ctx context.Context) error { + // initialize two progress indicators for the two steps, + // with a total work of n for each + err := workflows.Progress("step1").Add(ctx, uint64(t.N)) + if err != nil { + return err + } + + err = workflows.Progress("step2").Add(ctx, uint64(t.N)) + if err != nil { + return err + } + + // now submit N subtasks for the two steps + for range t.N { + step1, err := workflows.SubmitSubtask(ctx, &Step1{}) + if err != nil { + return err + } + + _, err = workflows.SubmitSubtask(ctx, &Step2{}, subtask.WithDependencies(step1)) + if err != nil { + return err + } + } + + return nil +} + +type Step1 struct{} + +func (t *Step1) Execute(ctx context.Context) error { + // 1 unit of work for step1 is done + err := workflows.Progress("step1").Done(ctx, 1) + if err != nil { + return err + } + + return nil +} + +type Step2 struct{} + +func (t *Step2) Execute(ctx context.Context) error { + // 1 unit of work for step2 is done + err := workflows.Progress("step2").Done(ctx, 1) + if err != nil { + return err + } + + return nil +} + +type Cleanup struct{} + +func (t *Cleanup) Execute(ctx context.Context) error { + // finalize is done + err := workflows.Progress("finalize").Done(ctx, 1) + if err != nil { + return err + } + + return nil +} ``` + ## Querying Progress @@ -108,32 +226,28 @@ class ToGrayscale(Task): At any time during a job's execution, you can query the current progress of a job using the `find` method on the job client. The returned job object contains a `progress` field that contains the current progress of the job. -```python -from tilebox.workflows import Client - -job_client = Client().jobs() -job = job_client.submit("download-images", DownloadImages( - [ - "https://picsum.photos/id/123/500/500", - "https://picsum.photos/id/155/500/500", - ], -)) - -job = job_client.find(job.id) # refresh the job object -print(job) +```python Python +job = job_client.find(job_id) +for indicator in job.progress: + print(f"{indicator.label}: {indicator.done}/{indicator.total}") +``` +```go Go +job, err := client.Jobs.Get(ctx, jobID) +if err != nil { + slog.Error("Failed to get job", slog.Any("error", err)) + return +} + +for _, indicator := range job.Progress { + fmt.Printf("%s: %d/%d\n", indicator.Label, indicator.Done, indicator.Total) +} ``` ```plaintext Output -Job( - id=UUID('019952b8-a5dc-f4c0-e428-724ccc587d83'), - name='download-images', - ..., - progress=[ - ProgressIndicator(label='download', total=2, done=1), - ProgressIndicator(label='process', total=2, done=0), - ] -) +finalize: 1/1 +step1: 4/4 +step2: 4/4 ``` ### Progress Display in interactive environments