-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: build wmstore and wmstorewatcher directly, and remove some unnecessary fields #970
Conversation
Signed-off-by: Derek Wang <whynowy@gmail.com>
Signed-off-by: Derek Wang <whynowy@gmail.com>
Signed-off-by: Derek Wang <whynowy@gmail.com>
Signed-off-by: Derek Wang <whynowy@gmail.com>
bucketName: bucketName, | ||
kvEntryCh: kvEntryCh, | ||
kvHistory: []kvs.KVEntry{}, | ||
updatesChMap: make(map[string]chan kvs.KVEntry), | ||
doneCh: make(chan struct{}), | ||
log: logging.FromContext(ctx).With("pipeline", pipelineName).With("bucketName", bucketName), | ||
log: logging.FromContext(ctx).With("bucketName", bucketName), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only for logging, removed.
var err error | ||
var jsStore = &jetStreamStore{ | ||
client: client, | ||
log: logging.FromContext(ctx).With("pipeline", pipelineName).With("kvName", kvName), | ||
log: logging.FromContext(ctx).With("kvName", kvName), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log in ctx
comes with pipeline
.
@@ -66,7 +66,7 @@ func NewKVJetStreamKVWatch(ctx context.Context, pipelineName string, kvName stri | |||
kvwTimer: time.NewTimer(kvOpts.watcherCreationThreshold), | |||
opts: kvOpts, | |||
doneCh: make(chan struct{}), | |||
log: logging.FromContext(ctx).With("pipeline", pipelineName).With("kvName", kvName), | |||
log: logging.FromContext(ctx).With("kvName", kvName), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log in ctx
comes with pipeline
.
fromBufferPartitionCount: fromBufferPartitionCount, | ||
log: logging.FromContext(ctx).With("bucket", bucket), | ||
log: logging.FromContext(ctx), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log from the ctx
comes with bucket
.
// Initialize a new empty watermarks DLL with nil values of the size capacity. | ||
// This is to avoid length check: when a new element is added, the tail element will be deleted. | ||
offsetTimeline := OffsetTimeline{ | ||
ctx: ctx, | ||
capacity: c, | ||
log: logging.FromContext(ctx).With("bucket", bucket), | ||
log: logging.FromContext(ctx), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log already has bucket
.
WatermarkStore
andWatermarkStoreWatcher
directly frombucket
.bucket
(only for logging) as an arg, usectx
to pass it.pipeline
(only for logging) as an arg, usectx
to pass it.