Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new ForEach core task to unify and simplify EachParallel and EachSequential #2137

Closed
anna-geller opened this issue Sep 18, 2023 · 0 comments · Fixed by #4343
Closed
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@anna-geller
Copy link
Member

anna-geller commented Sep 18, 2023

Feature description

Problem

Some users getting started with Kestra are confused as to why a list of tasks specified within the EachParallel task is not executed sequentially, one after the other, for each parallel group of tasks. Many find it confusing that they need to wrap these tasks into a Sequential task to achieve parallelism for an entire group of tasks.

At the same time, the EachSequential task seems to be functionally equivalent to EachParallel using the concurrent property set to 1.

Lastly, the naming of tasks and properties is confusing:

  1. The name of the task is EachParallel, and then instead of parallel, it has a concurrent property. It would make more sense to have EachConcurrent in this case to be precise or change that property to parallel
  2. Both of these tasks accept a list of values. However, the property name is configured as a singular value, which is a little confusing given that it accepts a list of values, not a single value
  3. The task is doing a For loop. Some users may search for such looping task in autocompletion using "for" and they may not find it. Using the name ForEach will be easy to find for users looking for "each" and "for" keywords.

Solution

Before the 1.0 release, it's important to unify both tasks into a single ForEach task that:

  1. Allows sequential and concurrent execution of tasks simply by configuring it as task property concurrent, not as a dedicated task
  2. Uses values property in plural or items to unify with ForEachItem

Proposed syntax:

id: pythonPartitionsMetrics
namespace: blueprint
description: Process partitions in parallel

tasks:
  - id: getPartitions
    type: io.kestra.plugin.scripts.python.Script
    runner: DOCKER
    docker:
      image: ghcr.io/kestra-io/pydata:latest
    script: |
      from kestra import Kestra
      partitions = [f"file_{nr}.parquet" for nr in range(1, 1000)]
      Kestra.outputs({'partitions': partitions})

  - id: processPartitions
    type: io.kestra.core.tasks.flows.ForEach
    items: '{{outputs.getPartitions.vars.partitions}}'
    concurrent: 10 # set to 1 to get EachSequential-like processing
    tasks:
      - id: log_start
        type: io.kestra.core.tasks.log.Log
        message: staring processing {{ taskrun.item }}

      - id: partition
        type: io.kestra.plugin.scripts.python.Script
        runner: DOCKER
        docker:
          image: ghcr.io/kestra-io/pydata:latest
        script: |
          import random
          import time
          from kestra import Kestra

          filename = '{{ taskrun.item }}'
          print(f"Reading and processing partition {filename}")
          nr_rows = random.randint(1, 1000)
          processing_time = random.randint(1, 20)
          time.sleep(processing_time)
          Kestra.counter('nr_rows', nr_rows, {'partition': filename})
          Kestra.timer('processing_time', processing_time, {'partition': filename})

      - id: log_end
        type: io.kestra.core.tasks.log.Log
        message: finished processing {{ taskrun.item }}

Allowing failure of parallel child tasks

This new task should make it possible for all child tasks to run till completion in a non-blocking way even if some child task fails. We were considering continueOnError: boolean. However, if allowFailure will be added to the core tasks, there seems to be no need for the extra continueOnError -- see #2248.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

2 participants