diff --git a/pkg/pbq/store/memory/stores.go b/pkg/pbq/store/memory/stores.go index c9d30a24a..8487376a6 100644 --- a/pkg/pbq/store/memory/stores.go +++ b/pkg/pbq/store/memory/stores.go @@ -66,6 +66,8 @@ func (ms *memoryStores) CreateStore(ctx context.Context, partitionID partition.I } func (ms *memoryStores) DiscoverPartitions(ctx context.Context) ([]partition.ID, error) { + ms.RLock() + defer ms.RUnlock() if ms.discoverFunc == nil { partitionsIds := make([]partition.ID, 0) for key := range ms.partitions { diff --git a/pkg/reduce/reduce_test.go b/pkg/reduce/reduce_test.go index 7457884b6..2085228d8 100644 --- a/pkg/reduce/reduce_test.go +++ b/pkg/reduce/reduce_test.go @@ -700,7 +700,7 @@ func TestDataForward_WithContextClose(t *testing.T) { // wait for the partitions to be created for { partitionsList := pbqManager.ListPartitions() - if len(partitionsList) > 0 { + if len(partitionsList) > 1 { childCancel() break } @@ -717,7 +717,7 @@ func TestDataForward_WithContextClose(t *testing.T) { for { discoveredPartitions, _ = storeProvider.DiscoverPartitions(ctx) - if len(discoveredPartitions) > 0 { + if len(discoveredPartitions) > 1 { break } select {