Skip to content

Resolved go routine leak in flush log function. #190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions klog.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ type flushSyncWriter interface {
// init sets up the defaults and runs flushDaemon.
func init() {
logging.stderrThreshold = errorLog // Default stderrThreshold is ERROR.
stop, done := make(chan struct{}), make(chan struct{})
logging.setVState(0, nil, false)
logging.logDir = ""
logging.logFile = ""
Expand All @@ -414,7 +415,12 @@ func init() {
logging.addDirHeader = false
logging.skipLogHeaders = false
logging.oneOutput = false
go logging.flushDaemon()
go logging.flushDaemon(stop, done)
go func() {
time.Sleep(20 * time.Second)
close(stop)
Comment on lines +420 to +421
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really seems like it will cause the logger to stop logging after 20 seconds (or so, when the stop channel is closed).

I'm wondering if what is really needed is a signal handler, but I worry a bit that adding a signal handler directly here would potentially lead to logging information being dropped too early.

I kind of wish that klog require explicit initialization rather than being done in init(), since then we could have the caller provide the stop channel and do the signal handling.

I'm not quite sure what the best path forward is from here, not sure if anyone else has some suggestions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @detiber , you got it right but I am sure that the piece of code before my change was not even working, the init() will exit before it get chance to flush the logs. There is no way the init() function is waiting for go routine flushing the logs. In my case the flush function will be called 4 times. Now question arise, that do we need to make a function which will work other than init().

}()
<-done
}

// InitFlags is for explicitly initializing the flags.
Expand Down Expand Up @@ -1165,9 +1171,17 @@ func (l *loggingT) createFiles(sev severity) error {
const flushInterval = 5 * time.Second

// flushDaemon periodically flushes the log file buffers.
func (l *loggingT) flushDaemon() {
for range time.NewTicker(flushInterval).C {
l.lockAndFlushAll()
// added select case to quit channels
func (l *loggingT) flushDaemon(stop, done chan struct{}) {
ticker := time.NewTicker(flushInterval)
for {
select {
case <-ticker.C:
l.lockAndFlushAll()
case <-stop:
close(done)
return
}
}
}

Expand Down Expand Up @@ -1541,6 +1555,7 @@ type LogFilter interface {
FilterS(msg string, keysAndValues []interface{}) (string, []interface{})
}

// SetLogFilter will add lg filter
func SetLogFilter(filter LogFilter) {
logging.mu.Lock()
defer logging.mu.Unlock()
Expand Down
12 changes: 9 additions & 3 deletions klog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func setFlags() {
logging.addDirHeader = false
}

func setTicker() (chan struct{}, chan struct{}) {
stop, done := make(chan struct{}), make(chan struct{})
return stop, done
}

// Test that Info works as advertised.
func TestInfo(t *testing.T) {
setFlags()
Expand Down Expand Up @@ -372,11 +377,12 @@ func TestVmoduleOff(t *testing.T) {

func TestSetOutputDataRace(t *testing.T) {
setFlags()
stop, done := setTicker()
defer logging.swap(logging.newBuffers())
var wg sync.WaitGroup
for i := 1; i <= 50; i++ {
go func() {
logging.flushDaemon()
logging.flushDaemon(stop, done)
}()
}
for i := 1; i <= 50; i++ {
Expand All @@ -388,7 +394,7 @@ func TestSetOutputDataRace(t *testing.T) {
}
for i := 1; i <= 50; i++ {
go func() {
logging.flushDaemon()
logging.flushDaemon(stop, done)
}()
}
for i := 1; i <= 50; i++ {
Expand All @@ -400,7 +406,7 @@ func TestSetOutputDataRace(t *testing.T) {
}
for i := 1; i <= 50; i++ {
go func() {
logging.flushDaemon()
logging.flushDaemon(stop, done)
}()
}
wg.Wait()
Expand Down