Skip to content

Commit

Permalink
refactor: build wmstore and wmstorewatcher directly, and remove some …
Browse files Browse the repository at this point in the history
…unnecessary fields (#970)

Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Aug 21, 2023
1 parent ed53342 commit 1f33bf8
Show file tree
Hide file tree
Showing 43 changed files with 325 additions and 402 deletions.
4 changes: 1 addition & 3 deletions cmd/commands/daemon_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,11 @@ func NewDaemonServerCommand() *cobra.Command {
Use: "daemon-server",
Short: "Start the daemon server",
RunE: func(cmd *cobra.Command, args []string) error {
logger := logging.NewLogger().Named("daemon-server")

pl, err := decodePipeline()
if err != nil {
return fmt.Errorf("failed to decode the pipeline spec: %v", err)
}

logger := logging.NewLogger().Named("daemon-server").With("pipeline", pl.Name)
ctx := logging.WithLogger(signals.SetupSignalHandler(), logger)
server := server.NewDaemonServer(pl, v1alpha1.ISBSvcType(isbSvcType))
return server.Run(ctx)
Expand Down
2 changes: 1 addition & 1 deletion cmd/commands/isbsvc_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func NewISBSvcCreateCommand() *cobra.Command {
Use: "isbsvc-create",
Short: "Create buffers, buckets and side inputs store",
RunE: func(cmd *cobra.Command, args []string) error {
logger := logging.NewLogger().Named("isbsvc-create")
pipelineName, defined := os.LookupEnv(v1alpha1.EnvPipelineName)
if !defined {
return fmt.Errorf("required environment variable '%s' not defined", v1alpha1.EnvPipelineName)
}
logger := logging.NewLogger().Named("isbsvc-create").With("pipeline", pipelineName)
isbSvcConfig := &v1alpha1.BufferServiceConfig{}
encodedBufferServiceConfig := os.Getenv(v1alpha1.EnvISBSvcConfig)
if len(encodedBufferServiceConfig) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion cmd/commands/isbsvc_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func NewISBSvcDeleteCommand() *cobra.Command {
Use: "isbsvc-delete",
Short: "Delete ISB Service buffers, buckets and side inputs store",
RunE: func(cmd *cobra.Command, args []string) error {
logger := logging.NewLogger().Named("isbsvc-delete")
pipelineName, defined := os.LookupEnv(v1alpha1.EnvPipelineName)
if !defined {
return fmt.Errorf("required environment variable '%s' not defined", v1alpha1.EnvPipelineName)
}
logger := logging.NewLogger().Named("isbsvc-delete").With("pipeline", pipelineName)
var isbsClient isbsvc.ISBService
var err error
ctx := logging.WithLogger(context.Background(), logger)
Expand Down
2 changes: 1 addition & 1 deletion cmd/commands/isbsvc_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func NewISBSvcValidateCommand() *cobra.Command {
Use: "isbsvc-validate",
Short: "Validate ISB Service buffers, buckets and side inputs store",
RunE: func(cmd *cobra.Command, args []string) error {
logger := logging.NewLogger().Named("isbsvc-validate")
pipelineName, existing := os.LookupEnv(v1alpha1.EnvPipelineName)
if !existing {
return fmt.Errorf("environment variable %q not existing", v1alpha1.EnvPipelineName)
}
logger := logging.NewLogger().Named("isbsvc-validate").With("pipeline", pipelineName)
var isbsClient isbsvc.ISBService
var err error
ctx := logging.WithLogger(context.Background(), logger)
Expand Down
2 changes: 1 addition & 1 deletion cmd/commands/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewProcessorCommand() *cobra.Command {
if err != nil {
return fmt.Errorf("invalid replica %q", replicaStr)
}
log = log.With("vertex", vertex.Name)
log = log.With("pipeline", vertex.Spec.PipelineName).With("vertex", vertex.Spec.Name)
vertexInstance := &dfv1.VertexInstance{
Vertex: vertex,
Hostname: hostname,
Expand Down
3 changes: 1 addition & 2 deletions cmd/commands/side_inputs_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func NewSideInputsInitCommand() *cobra.Command {
Use: "side-inputs-init",
Short: "Start the Side Inputs init service",
RunE: func(cmd *cobra.Command, args []string) error {
logger := logging.NewLogger().Named("side-inputs-init")

pipelineName, defined := os.LookupEnv(dfv1.EnvPipelineName)
if !defined {
Expand All @@ -47,7 +46,7 @@ func NewSideInputsInitCommand() *cobra.Command {
if len(sideInputs) == 0 {
return fmt.Errorf("no side inputs are defined for this vertex")
}

logger := logging.NewLogger().Named("side-inputs-init").With("pipeline", pipelineName)
ctx := logging.WithLogger(context.Background(), logger)
sideInputsInitializer := initializer.NewSideInputsInitializer(dfv1.ISBSvcType(isbSvcType), pipelineName, sideInputsStore, sideInputs)
return sideInputsInitializer.Run(ctx)
Expand Down
4 changes: 2 additions & 2 deletions cmd/commands/side_inputs_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ func NewSideInputsManagerCommand() *cobra.Command {
Use: "side-inputs-manager",
Short: "Start a Side Inputs Manager",
RunE: func(cmd *cobra.Command, args []string) error {
logger := logging.NewLogger().Named("side-inputs-manager")

encodedSiceInputSpec, defined := os.LookupEnv(dfv1.EnvSideInputObject)
if !defined {
return fmt.Errorf("environment %q is not defined", dfv1.EnvSideInputObject)
Expand All @@ -59,6 +57,8 @@ func NewSideInputsManagerCommand() *cobra.Command {
return fmt.Errorf("environment %q is not defined", dfv1.EnvPipelineName)
}

logger := logging.NewLogger().Named("side-inputs-manager").With("pipeline", pipelineName)

ctx := logging.WithLogger(signals.SetupSignalHandler(), logger)
sideInputManager := manager.NewSideInputsManager(dfv1.ISBSvcType(isbSvcType), pipelineName, sideInputsStore, sideInput)
return sideInputManager.Start(ctx)
Expand Down
3 changes: 1 addition & 2 deletions cmd/commands/side_inputs_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ func NewSideInputsWatcherCommand() *cobra.Command {
Use: "side-inputs-watcher",
Short: "Start the Side Inputs Watcher",
RunE: func(cmd *cobra.Command, args []string) error {
logger := logging.NewLogger().Named("side-inputs-watcher")

pipelineName, defined := os.LookupEnv(dfv1.EnvPipelineName)
if !defined {
return fmt.Errorf("environment %q is not defined", dfv1.EnvPipelineName)
Expand All @@ -49,6 +47,7 @@ func NewSideInputsWatcherCommand() *cobra.Command {
return fmt.Errorf("no side inputs are defined for this vertex")
}

logger := logging.NewLogger().Named("side-inputs-watcher").With("pipeline", pipelineName)
ctx := logging.WithLogger(signals.SetupSignalHandler(), logger)
sideInputsWatcher := synchronizer.NewSideInputsSynchronizer(dfv1.ISBSvcType(isbSvcType), pipelineName, sideInputsStore, sideInputs)
return sideInputsWatcher.Start(ctx)
Expand Down
11 changes: 4 additions & 7 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
"github.com/numaproj/numaflow/pkg/isb/testutils"
"github.com/numaproj/numaflow/pkg/shared/kvs"
"github.com/numaproj/numaflow/pkg/shared/kvs/inmem"
"github.com/numaproj/numaflow/pkg/shared/logging"
udfapplier "github.com/numaproj/numaflow/pkg/udf/function"
"github.com/numaproj/numaflow/pkg/watermark/generic"
Expand All @@ -47,8 +46,7 @@ import (
const (
testPipelineName = "testPipeline"
testProcessorEntity = "publisherTestPod"
publisherHBKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "PROCESSORS"
publisherOTKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "OT"
publisherKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s"
)

var (
Expand Down Expand Up @@ -1603,10 +1601,9 @@ func buildPublisherMapAndOTStore(toBuffers map[string][]isb.BufferWriter) (map[s
publishers := make(map[string]publish.Publisher)
otStores := make(map[string]kvs.KVStorer)
for key, partitionedBuffers := range toBuffers {
heartbeatKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherHBKeyspace, key))
otKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherOTKeyspace, key))
otStores[key] = otKV
p := publish.NewPublish(ctx, processorEntity, wmstore.BuildWatermarkStore(heartbeatKV, otKV), int32(len(partitionedBuffers)), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1))
store, _, _, _ := wmstore.BuildInmemWatermarkStore(ctx, fmt.Sprintf(publisherKeyspace, key))
otStores[key] = store.OffsetTimelineStore()
p := publish.NewPublish(ctx, processorEntity, store, int32(len(partitionedBuffers)), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1))
publishers[key] = p
}
return publishers, otStores
Expand Down
40 changes: 13 additions & 27 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ import (
"go.uber.org/zap"

jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
wmstore "github.com/numaproj/numaflow/pkg/watermark/store"
)

type jetStreamSvc struct {
Expand Down Expand Up @@ -146,7 +145,7 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b

for _, bucket := range buckets {
// Create offset-timeline KV
otKVName := JetStreamOTKVName(bucket)
otKVName := wmstore.JetStreamOTKVName(bucket)
if _, err := js.KeyValue(otKVName); err != nil {
if !errors.Is(err, nats.ErrBucketNotFound) && !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to query information of bucket %q during buffer creating, %w", otKVName, err)
Expand All @@ -165,7 +164,7 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b
}
}
// Create processor KV
procKVName := JetStreamProcessorKVName(bucket)
procKVName := wmstore.JetStreamProcessorKVName(bucket)
if _, err := js.KeyValue(procKVName); err != nil {
if !errors.Is(err, nats.ErrBucketNotFound) && !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to query information of bucket %q during buffer creating, %w", procKVName, err)
Expand Down Expand Up @@ -209,12 +208,12 @@ func (jss *jetStreamSvc) DeleteBuffersAndBuckets(ctx context.Context, buffers, b
log.Infow("Succeeded to delete a stream", zap.String("stream", streamName))
}
for _, bucket := range buckets {
otKVName := JetStreamOTKVName(bucket)
otKVName := wmstore.JetStreamOTKVName(bucket)
if err := js.DeleteKeyValue(otKVName); err != nil && !errors.Is(err, nats.ErrBucketNotFound) && !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to delete offset timeline KV %q, %w", otKVName, err)
}
log.Infow("Succeeded to delete an offset timeline KV", zap.String("kvName", otKVName))
procKVName := JetStreamProcessorKVName(bucket)
procKVName := wmstore.JetStreamProcessorKVName(bucket)
if err := js.DeleteKeyValue(procKVName); err != nil && !errors.Is(err, nats.ErrBucketNotFound) && !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to delete processor KV %q, %w", procKVName, err)
}
Expand Down Expand Up @@ -251,12 +250,12 @@ func (jss *jetStreamSvc) ValidateBuffersAndBuckets(ctx context.Context, buffers,
}
}
for _, bucket := range buckets {
otKVName := JetStreamOTKVName(bucket)
otKVName := wmstore.JetStreamOTKVName(bucket)
if _, err := js.KeyValue(otKVName); err != nil {
return fmt.Errorf("failed to query OT KV %q, %w", otKVName, err)
}

procKVName := JetStreamProcessorKVName(bucket)
procKVName := wmstore.JetStreamProcessorKVName(bucket)
if _, err := js.KeyValue(procKVName); err != nil {
return fmt.Errorf("failed to query processor KV %q, %w", procKVName, err)
}
Expand Down Expand Up @@ -317,29 +316,24 @@ func (jss *jetStreamSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buf

// CreateProcessorManagers is used to create processor manager for the given bucket.
func (jss *jetStreamSvc) CreateProcessorManagers(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]*processor.ProcessorManager, error) {
log := logging.FromContext(ctx).With("bucket", bucketName)
ctx = logging.WithLogger(ctx, log)
var processorManagers []*processor.ProcessorManager
fetchers := 1
if isReduce {
fetchers = fromBufferPartitionCount
}
// if it's not a reduce vertex, we don't need multiple watermark fetchers. We use common fetcher among all partitions.
for i := 0; i < fetchers; i++ {
hbKVName := JetStreamProcessorKVName(bucketName)
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, hbKVName, jss.jsClient)
storeWatcher, err := wmstore.BuildJetStreamWatermarkStoreWatcher(ctx, bucketName, jss.jsClient)
if err != nil {
return nil, err
}
otKVName := JetStreamOTKVName(bucketName)
otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, otKVName, jss.jsClient)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed at new JetStream watermark store watcher, %w", err)
}
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatch, otWatch)
var pm *processor.ProcessorManager
if isReduce {
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
} else {
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount))
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount))
}
processorManagers = append(processorManagers, pm)
}
Expand All @@ -350,14 +344,6 @@ func JetStreamName(bufferName string) string {
return bufferName
}

func JetStreamOTKVName(bucketName string) string {
return fmt.Sprintf("%s_OT", bucketName)
}

func JetStreamProcessorKVName(bucketName string) string {
return fmt.Sprintf("%s_PROCESSORS", bucketName)
}

func JetStreamSideInputsStoreKVName(sideInputStoreName string) string {
return fmt.Sprintf("%s_SIDE_INPUTS", sideInputStoreName)
}
16 changes: 7 additions & 9 deletions pkg/isbsvc/redis_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ import (
"go.uber.org/zap"

redis2 "github.com/numaproj/numaflow/pkg/isb/stores/redis"
"github.com/numaproj/numaflow/pkg/shared/kvs/noop"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"

redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
)

type isbsRedisSvc struct {
Expand Down Expand Up @@ -141,21 +139,21 @@ func (r *isbsRedisSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buffe

// CreateProcessorManagers is used to create the processor managers for the given bucket.
func (r *isbsRedisSvc) CreateProcessorManagers(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]*processor.ProcessorManager, error) {
log := logging.FromContext(ctx).With("bucket", bucketName)
ctx = logging.WithLogger(ctx, log)
// Watermark fetching is not supported for Redis ATM. Creating noop watermark fetcher.
var processorManagers []*processor.ProcessorManager
fetchers := 1
if isReduce {
fetchers = fromBufferPartitionCount
}
for i := 0; i < fetchers; i++ {
hbWatcher := noop.NewKVOpWatch()
otWatcher := noop.NewKVOpWatch()
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
storeWatcher, _ := store.BuildNoOpWatermarkStoreWatcher()
var pm *processor.ProcessorManager
if isReduce {
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
} else {
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount))
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount))
}
processorManagers = append(processorManagers, pm)
}
Expand Down
25 changes: 9 additions & 16 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/numaproj/numaflow/pkg/reduce/pbq/store/memory"
"github.com/numaproj/numaflow/pkg/reduce/pnf"
"github.com/numaproj/numaflow/pkg/shared/kvs"
"github.com/numaproj/numaflow/pkg/shared/kvs/inmem"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/publish"
Expand Down Expand Up @@ -1281,17 +1280,14 @@ func TestReduceDataForward_SumMultiPartitions(t *testing.T) {
func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryBuffer, key string) (fetch.Fetcher, publish.Publisher) {

var (
keyspace = key
hbBucketName = keyspace + "_PROCESSORS"
otBucketName = keyspace + "_OT"
keyspace = key
)

sourcePublishEntity := processor.NewProcessorEntity(fromBuffer.GetName())
hb, hbWatcherCh, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, hbBucketName)
ot, otWatcherCh, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, otBucketName)
store, hbWatcherCh, otWatcherCh, _ := wmstore.BuildInmemWatermarkStore(ctx, keyspace)

// publisher for source
sourcePublisher := publish.NewPublish(ctx, sourcePublishEntity, wmstore.BuildWatermarkStore(hb, ot), 1, publish.WithAutoRefreshHeartbeatDisabled())
sourcePublisher := publish.NewPublish(ctx, sourcePublishEntity, store, 1, publish.WithAutoRefreshHeartbeatDisabled())

// publish heartbeat manually for the processor
go func() {
Expand All @@ -1300,16 +1296,14 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB
case <-ctx.Done():
return
default:
_ = hb.PutKV(ctx, fromBuffer.GetName(), []byte(fmt.Sprintf("%d", time.Now().Unix())))
_ = store.HeartbeatStore().PutKV(ctx, fromBuffer.GetName(), []byte(fmt.Sprintf("%d", time.Now().Unix())))
time.Sleep(time.Duration(1) * time.Second)
}
}
}()

hbWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_PROCESSORS", hbWatcherCh)
otWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_OT", otWatcherCh)
storeWatcher := wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
pm := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 1, processor.WithIsReduce(true))
storeWatcher, _ := wmstore.BuildInmemWatermarkStoreWatcher(ctx, keyspace, hbWatcherCh, otWatcherCh)
pm := processor.NewProcessorManager(ctx, storeWatcher, 1, processor.WithIsReduce(true))
for waitForReadyP := pm.GetProcessor(fromBuffer.GetName()); waitForReadyP == nil; waitForReadyP = pm.GetProcessor(fromBuffer.GetName()) {
// wait until the test processor has been added to the processor list
time.Sleep(time.Millisecond * 100)
Expand All @@ -1336,10 +1330,9 @@ func buildPublisherMapAndOTStore(ctx context.Context, toBuffers map[string][]isb
index := int32(0)
for key, partitionedBuffers := range toBuffers {
publishEntity := processor.NewProcessorEntity(key)
hb, hbKVEntry, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, key+"_PROCESSORS")
ot, otKVEntry, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, key+"_OT")
otStores[key] = ot
p := publish.NewPublish(ctx, publishEntity, wmstore.BuildWatermarkStore(hb, ot), int32(len(partitionedBuffers)), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1))
store, hbKVEntry, otKVEntry, _ := wmstore.BuildInmemWatermarkStore(ctx, key)
otStores[key] = store.OffsetTimelineStore()
p := publish.NewPublish(ctx, publishEntity, store, int32(len(partitionedBuffers)), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1))
publishers[key] = p

go func() {
Expand Down

0 comments on commit 1f33bf8

Please sign in to comment.