Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sd adapter go rountine leak #14775

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions mixer/adapter/stackdriver/metric/bufferedClient.go
Expand Up @@ -81,11 +81,14 @@ func batchTimeSeries(series []*monitoringpb.TimeSeries, tsLimit int) [][]*monito
return batches
}

func (b *buffered) start(env adapter.Env, ticker *time.Ticker) {
func (b *buffered) start(env adapter.Env, ticker *time.Ticker, quit chan int) {
env.ScheduleDaemon(func() {
for range ticker.C {
select {
case <-ticker.C:
b.mergeTimeSeries()
b.Send()
case <-quit:
return
}
})
}
Expand Down
9 changes: 6 additions & 3 deletions mixer/adapter/stackdriver/metric/metric.go
Expand Up @@ -74,6 +74,7 @@ type (
client bufferedClient
// We hold a ref for cleanup during Close()
ticker *time.Ticker
quit chan int
}
)

Expand Down Expand Up @@ -169,7 +170,7 @@ func (b *builder) Build(ctx context.Context, env adapter.Env) (adapter.Handler,
}

ticker := time.NewTicker(cfg.PushInterval)

quit := make(chan int)
bianpengyuan marked this conversation as resolved.
Show resolved Hide resolved
var err error
var client *monitoring.MetricClient
if client, err = b.createClient(cfg); err != nil {
Expand All @@ -191,15 +192,16 @@ func (b *builder) Build(ctx context.Context, env adapter.Env) (adapter.Handler,
pushInterval: cfg.PushInterval,
env: env,
}
// We hold on to the ref to the ticker so we can stop it later
buffered.start(env, ticker)
// We hold on to the ref to the ticker so we can stop it later and quit channel to exit the daemon.
buffered.start(env, ticker, quit)
h := &handler{
l: env.Logger(),
now: time.Now,
client: buffered,
md: md,
metricInfo: types,
ticker: ticker,
quit: quit,
}
return h, nil
}
Expand Down Expand Up @@ -266,6 +268,7 @@ func (h *handler) HandleMetric(_ context.Context, vals []*metric.Instance) error

func (h *handler) Close() error {
h.ticker.Stop()
h.quit <- 1
bianpengyuan marked this conversation as resolved.
Show resolved Hide resolved
return h.client.Close()
}

Expand Down