diff --git a/pkg/pbq/pbqmanager.go b/pkg/pbq/pbqmanager.go index 4490888b3..dd7a27720 100644 --- a/pkg/pbq/pbqmanager.go +++ b/pkg/pbq/pbqmanager.go @@ -24,6 +24,9 @@ import ( "sync" "time" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/wait" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/pbq/partition" @@ -31,8 +34,6 @@ import ( "github.com/numaproj/numaflow/pkg/pbq/store/memory" "github.com/numaproj/numaflow/pkg/pbq/store/noop" "github.com/numaproj/numaflow/pkg/shared/logging" - "go.uber.org/zap" - "k8s.io/apimachinery/pkg/util/wait" ) // Manager helps in managing the lifecycle of PBQ instances @@ -213,7 +214,7 @@ func (m *Manager) ShutDown(ctx context.Context) { Jitter: 0.1, } - for _, v := range m.pbqMap { + for _, v := range m.getPBQs() { wg.Add(1) go func(q *PBQ) { defer wg.Done() @@ -256,11 +257,22 @@ func (m *Manager) deregister(partitionID partition.ID) { delete(m.pbqMap, partitionID.String()) } +func (m *Manager) getPBQs() []*PBQ { + m.RLock() + defer m.RUnlock() + var pbqs = make([]*PBQ, 0) + for _, pbq := range m.pbqMap { + pbqs = append(pbqs, pbq) + } + + return pbqs +} + // Replay replays messages which are persisted in pbq store. func (m *Manager) Replay(ctx context.Context) { var wg sync.WaitGroup - for _, val := range m.pbqMap { + for _, val := range m.getPBQs() { wg.Add(1) m.log.Info("Replaying records from store", zap.Any("PBQ", val.PartitionID)) go func(ctx context.Context, p *PBQ) {