Skip to content

Commit

Permalink
fix: data race in pbq manager. Closes #348 (#349)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <vigith@gmail.com>
  • Loading branch information
vigith committed Nov 14, 2022
1 parent bc35945 commit f254c28
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions pkg/pbq/pbqmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ import (
"sync"
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/pbq/partition"
"github.com/numaproj/numaflow/pkg/pbq/store"
"github.com/numaproj/numaflow/pkg/pbq/store/memory"
"github.com/numaproj/numaflow/pkg/pbq/store/noop"
"github.com/numaproj/numaflow/pkg/shared/logging"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"
)

// Manager helps in managing the lifecycle of PBQ instances
Expand Down Expand Up @@ -213,7 +214,7 @@ func (m *Manager) ShutDown(ctx context.Context) {
Jitter: 0.1,
}

for _, v := range m.pbqMap {
for _, v := range m.getPBQs() {
wg.Add(1)
go func(q *PBQ) {
defer wg.Done()
Expand Down Expand Up @@ -256,11 +257,22 @@ func (m *Manager) deregister(partitionID partition.ID) {
delete(m.pbqMap, partitionID.String())
}

func (m *Manager) getPBQs() []*PBQ {
m.RLock()
defer m.RUnlock()
var pbqs = make([]*PBQ, 0)
for _, pbq := range m.pbqMap {
pbqs = append(pbqs, pbq)
}

return pbqs
}

// Replay replays messages which are persisted in pbq store.
func (m *Manager) Replay(ctx context.Context) {
var wg sync.WaitGroup

for _, val := range m.pbqMap {
for _, val := range m.getPBQs() {
wg.Add(1)
m.log.Info("Replaying records from store", zap.Any("PBQ", val.PartitionID))
go func(ctx context.Context, p *PBQ) {
Expand Down

0 comments on commit f254c28

Please sign in to comment.