Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Selector.HasPending #301

Closed
mfateev opened this issue Nov 29, 2020 · 0 comments · Fixed by #319
Closed

Add Selector.HasPending #301

mfateev opened this issue Nov 29, 2020 · 0 comments · Fixed by #319
Labels
enhancement New feature or request

Comments

@mfateev
Copy link
Member

mfateev commented Nov 29, 2020

Is your feature request related to a problem? Please describe.
Currently to guarantee that every signal is delivered all channels have to be drained using ReadAsync before completing a workflow. This leads to code that looks like:

type channelConfig struct {
  channel workflow.ReceiveChannel
  val interface{}
  handler func()  // do something with val
}

channels := []*channelConfig{
  {
    channel: workflow.GetSignalChannel(...)
    val: "",
    handler func() {
      // do something with val
    },
  },
  // etc
}

s := workflow.NewSelector(ctx)
for _, cc := range channels {
  s.AddReceive(cc.channel, func (c workflow.ReceiveChannel, _ bool) {
    c.Receive(ctx, &cc.val)
    cc.handler()
  })
}

for keepGoing {
  s.Select(ctx)
  // not safe to return here
}

// Now I want to return but first I need to make sure that no signals are pending
canReturn := false
for !canReturn {
  canReturn = true
  for _, cc := range channels {
    if ok := c.ReceiveAsync(&cc.val); ok {
      canReturn = false
      cc.handler()
    }
  }
}

Describe the solution you'd like
Add Selector.HasPending method that returns true if Selector.Select is not going to block if called. This would simplify the above code to:

type channelConfig struct {
  channel workflow.ReceiveChannel
  val interface{}
  handler func()  // do something with val
}

channels := []*channelConfig{
  {
    channel: workflow.GetSignalChannel(...)
    val: "",
    handler func() {
      // do something with val
    },
  },
  // etc
}

s := workflow.NewSelector(ctx)
for _, cc := range channels {
  s.AddReceive(cc.channel, func (c workflow.ReceiveChannel, _ bool) {
    c.Receive(ctx, &cc.val)
    cc.handler()
  })
}

for keepGoing || s.HasPending {
  s.Select(ctx)
}

// safe complete workflow here

Describe alternatives you've considered
Add Channel.HasPending or ReceiveChannel.Peek. But it wouldn't help with Select and still require code complications.

Additional context
https://community.temporal.io/t/continueasnew-signals/1008/25

@mfateev mfateev added the enhancement New feature or request label Nov 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant