Skip to content

Commit

Permalink
Merge pull request #11 from usedatabrew/fix/DAT-254/proc-invoke
Browse files Browse the repository at this point in the history
fix(stream): updated stages creation index to avoid incorrect proc ex…
  • Loading branch information
le-vlad committed Dec 16, 2023
2 parents 530af34 + f09de5e commit 1e74de1
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions public/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func (s *Stream) Start() error {
if err := s.validateAndInit(); err != nil {
return err
}
go s.source.Start()
var messagesProcessed = 0

go func() {
Expand All @@ -132,7 +131,11 @@ func (s *Stream) Start() error {
dataStream := tango.NewTango()

var dataStreamStages []tango.Stage
for _, proc := range s.processors {
for idx, _ := range s.processors {
// this is required as idx is a ref,
// so we will end up with the case when only the last processor will be executed
// as many times as we have processors
procIndex := idx
stage := tango.Stage{
Channel: make(chan interface{}),
Function: func(i interface{}) (interface{}, error) {
Expand All @@ -141,7 +144,7 @@ func (s *Stream) Start() error {
if i.(*message.Message) == nil {
return nil, nil
}
return proc.Process(i.(*message.Message))
return s.processors[procIndex].Process(i.(*message.Message))

}
return nil, nil
Expand Down Expand Up @@ -186,6 +189,7 @@ func (s *Stream) Start() error {
}
}()

go s.source.Start()
s.registry.SetState(service_registry.Started)

return dataStream.Start()
Expand Down

0 comments on commit 1e74de1

Please sign in to comment.