Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sample to limit concurrent activities in parallel #241

Closed
wants to merge 4 commits into from

Conversation

emanuelef
Copy link
Contributor

What was changed

Added a new sample to show how to run a limited number of concurrent activities per workflow.

Why?

There may be scenarios where the work performed by a single activity in a workflow is requiring too much resources, in that case it would be helpful to have an example on how to limit the number of concurrent activities per workflow and not per worker.
A discussion on the similar use case was found on https://community.temporal.io/t/limit-concurrent-activities-in-parallel/2066 but not a solutions with sample is currently available in this repo.

Checklist

  1. Closes

  2. How was this tested:
    Manually, through the sample with a local temporalite.

  3. Any docs updates needed?

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@cretz
Copy link
Member

cretz commented Dec 29, 2022

To confirm, this example is like splitmerge-selector but just adds a "max concurrent" setting? There are lots of ways you can customize those other examples when you apply to your code, but I am unsure they are deserving of a completely separate sample. Also, I think this algorithm could be simplified. Here's a diff for the splitmerge-selector that does the same thing:

diff --git a/splitmerge-selector/splitmerge_workflow.go b/splitmerge-selector/splitmerge_workflow.go
index 943c981..6d23980 100644
--- a/splitmerge-selector/splitmerge_workflow.go
+++ b/splitmerge-selector/splitmerge_workflow.go
@@ -21,15 +21,22 @@ type ChunkResult struct {
 }

 // SampleSplitMergeSelectorWorkflow workflow definition
-func SampleSplitMergeSelectorWorkflow(ctx workflow.Context, workerCount int) (result ChunkResult, err error) {
+func SampleSplitMergeSelectorWorkflow(ctx workflow.Context, workerCount, maxConcurrent int) (result ChunkResult, err error) {
        ao := workflow.ActivityOptions{
                StartToCloseTimeout: 10 * time.Second,
        }
        ctx = workflow.WithActivityOptions(ctx, ao)

        selector := workflow.NewSelector(ctx)
-       var totalItemCount, totalSum int
+       var totalItemCount, totalSum, futuresHandledEarly int
        for i := 0; i < workerCount; i++ {
+               // If we're at max concurrent, select one before adding another
+               if maxConcurrent > 0 && i >= maxConcurrent {
+                       if selector.Select(ctx); err != nil {
+                               return result, err
+                       }
+                       futuresHandledEarly++
+               }
                // ExecuteActivity returns Future that doesn't need to be awaited immediately.
                future := workflow.ExecuteActivity(ctx, ChunkProcessingActivity, i+1)
                selector.AddFuture(future, func(f workflow.Future) {
@@ -44,7 +51,7 @@ func SampleSplitMergeSelectorWorkflow(ctx workflow.Context, workerCount int) (re
                })
        }

-       for i := 0; i < workerCount; i++ {
+       for i := futuresHandledEarly; i < workerCount; i++ {
                // Each call to Select matches a single ready Future.
                // Each Future is matched only once independently on the number of Select calls.
                selector.Select(ctx)

@emanuelef
Copy link
Contributor Author

@cretz Thanks for reviewing the code and providing a simplified version. Yes, the idea is to have something like splitmerge-selector but with a limited number of concurrent activities.
I've seen the same requested in a post on the community forum and I thought it would be helpful to have an example. If not I'll just maybe add a comment to the same post with a possible solution ?

@cretz
Copy link
Member

cretz commented Dec 30, 2022

👍 Yeah just posting that patch on the forum may make sense. Some people also may prefer to wait for entire batches before starting another, or may prefer want to add activities/futures in others ways like from a signal handler, or any one of many other alterations to the existing sample.

When it comes to limiting concurrent actions with futures or goroutines or otherwise, Temporal is not unique. This is the same way you might handle it in pure Go (though replace future with a 1-buffer read channel). In fact, arguably for people that want more control (e.g. executing two activities), we should have a splitmerge-channels example that uses workflow.Go and workflow.NewBufferedChannel that would also serve as a clear max-concurrent limiter. Also, you could use workflow.Await with a condition to check a counter before submitting the next future to make sure there aren't too many. There are so many ways.

@emanuelef emanuelef closed this Jan 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants