-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: fix watermark components ownership #963
Conversation
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
pkg/sinks/sink.go
Outdated
// close the wm stores, since the publisher and fetcher are closed | ||
// since we created the stores, we can close them | ||
for _, wmStore := range wmStores { | ||
wmStore.HeartbeatStore().Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a Close()
function to WatermarkStore
interface ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/sinks/sink.go
Outdated
names := u.VertexInstance.Vertex.GetToBuffers() | ||
if u.VertexInstance.Vertex.IsASink() { | ||
// Sink has no to buffers, we use the vertex name as the buffer writer name. | ||
names = append(names, u.VertexInstance.Vertex.Spec.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already know it's a sink, then the line above is not needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/udf/reduce_udf.go
Outdated
log.Info("SIGTERM, exiting...") | ||
wg.Wait() | ||
|
||
// close the watermark fetcher and publisher since we created them | ||
err = fetchWatermark.Close() | ||
if err != nil { | ||
log.Error("Failed to close the watermark fetcher", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.Errorw()
pkg/udf/map_udf.go
Outdated
@@ -214,18 +245,29 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { | |||
// wait for all the forwarders to exit | |||
finalWg.Wait() | |||
|
|||
// Close the watermark fetcher and publisher | |||
// close the watermark fetcher and publisher since we created them | |||
err = fetchWatermark.Close() | |||
if err != nil { | |||
log.Error("Failed to close the watermark fetcher", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errorw()
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for making this change. The ownership among fetcher, publisher, processor manger and kv stores are much clearer now. +1
pkg/sinks/sink.go
Outdated
for _, publisher := range publishWatermark { | ||
err = publisher.Close() | ||
if err != nil { | ||
log.Error("Failed to close the watermark publisher", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.Error("Failed to close the watermark publisher", zap.Error(err)) | |
log.Errorw("Failed to close the watermark publisher", zap.Error(err)) |
pkg/udf/map_udf.go
Outdated
return err | ||
if u.VertexInstance.Vertex.Spec.Watermark.Disabled { | ||
names := u.VertexInstance.Vertex.GetToBuffers() | ||
if u.VertexInstance.Vertex.IsASink() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it.
pkg/udf/reduce_udf.go
Outdated
return err | ||
if u.VertexInstance.Vertex.Spec.Watermark.Disabled { | ||
names := u.VertexInstance.Vertex.GetToBuffers() | ||
if u.VertexInstance.Vertex.IsASink() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it.
pkg/sinks/sink.go
Outdated
if u.VertexInstance.Vertex.Spec.Watermark.Disabled { | ||
names := u.VertexInstance.Vertex.GetToBuffers() | ||
// sink has no to buffers, so we use the vertex name to publish the watermark | ||
names = append(names, u.VertexInstance.Vertex.Spec.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
names := []string{u.VertexInstance.Vertex.Spec.Name}
if err != nil { | ||
return err | ||
} | ||
|
||
// build publisher stores for source (we publish twice for source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add a comment here to explain why we need two sets of publisher stores?..
Signed-off-by: Derek Wang <whynowy@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to rename these, using BuildProcessorManagers
to build fetchers
, and BuildToVertexWatermarkStores
to build publishers
does not make sense.
The code should be self-documenting, as of now, it is quite difficult for a casual reader to understand the code.
Fetch
processorManagers, err = jetstream.BuildProcessorManagers(ctx, u.VertexInstance, natsClientPool.NextAvailableClient())
// create watermark fetcher using processor managers
fetchWatermark = fetch.NewEdgeFetcherSet(ctx, u.VertexInstance, processorManagers)
Publish
// create watermark stores
wmStores, err = jetstream.BuildToVertexWatermarkStores(ctx, u.VertexInstance, natsClientPool.NextAvailableClient())
// create watermark publisher using watermark stores
publishWatermark = jetstream.BuildPublishersFromStores(ctx, u.VertexInstance, wmStores)
readers, writers, err = buildJetStreamBufferIO(ctx, u.VertexInstance, natsClientPool)
related to #900
BuildProcessorManagers
,BuildToVertexWatermarkStores
andBuildPublishersFromStores
helper functionsio.Closer
fromFetcher
andUXFetcher
interface.also fixes #962