Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: memory leak inside session windower #1445

Merged
merged 6 commits into from
Jan 11, 2024
Merged

fix: memory leak inside session windower #1445

merged 6 commits into from
Jan 11, 2024

Conversation

yhl25
Copy link
Contributor

@yhl25 yhl25 commented Jan 10, 2024

  • fixes memory leak inside session windower
  • metrics to track number of active and closed windows
  • enhance tickgen to produce out of order messages

Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
@@ -142,20 +143,33 @@ func (r *kafkaSource) Partitions(context.Context) []int32 {
}

func (r *kafkaSource) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error) {
latestEtMap := make(map[int32]int64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for debug purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had added it for debugging, will remove it


// print the latest event time for each partition
for partition, latestEt := range latestEtMap {
df.opts.logger.Infow("Latest event time for partition - ", zap.Int32("partition", partition), zap.Int64("latestEt", int64(time.Since(time.Unix(0, latestEt)).Minutes())))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debugw?

for _, m := range transformerResults {
writeMessages = append(writeMessages, m.writeMessages...)
for _, message := range m.writeMessages {
// we convert each writeMessage to isb.ReadMessage by providing its parent ReadMessage's ReadOffset.
// since we use message event time instead of the watermark to determine and publish source watermarks,
// time.UnixMilli(-1) is assigned to the message watermark. transformedReadMessages are immediately
// used below for publishing source watermarks.
if latestEt, ok := latestEtMap[m.readMessage.ReadOffset.PartitionIdx()]; !ok || message.EventTime.UnixNano() < latestEt {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all of these debugging purpose logic, we should think about using an approach that does not affect the performance. e.g. turn it off by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Value *uint64 `json:"value,omitempty" protobuf:"bytes,5,opt,name=value"`
// Jitter is the jitter for the message generation, used to simulate out of order messages
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add more descriptions? an example would be great. I need to look into code to fully understand how jitter specification works.

Comment on lines 169 to 172
//// print the latest event time for each partition
//for partition, latestEt := range latestEtMap {
// r.logger.Infow("Latest event time for partition - ", zap.Int32("partition", partition), zap.Int64("latestEt", int64(time.Since(time.Unix(0, latestEt)).Minutes())))
//}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we uncomment or remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove

@@ -183,6 +192,19 @@ func (w *Windower) CloseWindows(time time.Time) []*window.TimedWindowRequest {
windowOperations = append(windowOperations, operation)
w.closedWindows.InsertBack(win)
}

metrics.ActiveWindowsCount.With(map[string]string{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity, why do we choose to emit metrics when we call CloseWindows, as opposed to other methods that are also updating the active&closed windows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloseWindows() is invoked once per batch, whereas other functions are invoked for every message. I think it's good to track the windows count once per batch.

@@ -99,8 +105,11 @@ type Windower struct {
closedWindows *window.SortedWindowListByEndTime
}

func NewWindower(length time.Duration, slide time.Duration) window.TimedWindower {
func NewWindower(length time.Duration, slide time.Duration, vertexInstance *dfv1.VertexInstance) window.TimedWindower {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you thought about adding a new method to the Windower interface EmitMetrics(vertexInstance *dfv1.VertexInstance)? That way the windower can emit metrics not only after closing windows but also after other operations, which is up to the caller to decide. It also simplifies the Windower struct itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think adding Prometheus metrics related functionality to the interface is ideal, the caller shouldn't be tasked with handling these metrics, since they are not directly related to window functionality.

Signed-off-by: Yashash H L <yashashhl25@gmail.com>
@yhl25 yhl25 merged commit 85e76c7 into main Jan 11, 2024
19 checks passed
@yhl25 yhl25 deleted the mem-leak branch January 11, 2024 16:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants