From 46147cf0ea4257520b81063141e0c0d02391cf2f Mon Sep 17 00:00:00 2001 From: Todd Beets Date: Sun, 10 Sep 2023 23:12:45 -0700 Subject: [PATCH 1/2] Add logfile_max_archives feature and test. --- logger/log.go | 80 ++++++++++++++++++++++++++--- server/log.go | 10 ++++ server/opts.go | 3 ++ test/log_test.go | 129 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 215 insertions(+), 7 deletions(-) create mode 100644 test/log_test.go diff --git a/logger/log.go b/logger/log.go index b271696739..81d0c9c741 100644 --- a/logger/log.go +++ b/logger/log.go @@ -18,6 +18,9 @@ import ( "fmt" "log" "os" + "path/filepath" + "sort" + "strings" "sync" "sync/atomic" "time" @@ -128,13 +131,14 @@ type fileLogger struct { out int64 canRotate int32 sync.Mutex - l *Logger - f writerAndCloser - limit int64 - olimit int64 - pid string - time bool - closed bool + l *Logger + f writerAndCloser + limit int64 + olimit int64 + pid string + time bool + closed bool + archiveLimit int } func newFileLogger(filename, pidPrefix string, time bool) (*fileLogger, error) { @@ -169,6 +173,12 @@ func (l *fileLogger) setLimit(limit int64) { } } +func (l *fileLogger) setArchiveLimit(limit int) { + l.Lock() + l.archiveLimit = limit + l.Unlock() +} + func (l *fileLogger) logDirect(label, format string, v ...interface{}) int { var entrya = [256]byte{} var entry = entrya[:0] @@ -190,6 +200,46 @@ func (l *fileLogger) logDirect(label, format string, v ...interface{}) int { return len(entry) } +func (l *fileLogger) archivePurge(fname string) { + // Evaluate number of saved backups for purge + // l readlock held + var backups []string + lDir := filepath.Dir(fname) + lBase := filepath.Base(fname) + entries, err := os.ReadDir(lDir) + if err != nil { + l.logDirect(l.l.errorLabel, "Unable to read directory %q for log purge (%v), will attempt next rotation", lDir, err) + return + } + for _, entry := range entries { + if entry.IsDir() || entry.Name() == lBase || !strings.HasPrefix(entry.Name(), lBase) { + continue + } + stamp, found := strings.CutPrefix(entry.Name(), fmt.Sprintf("%s%s", lBase, ".")) + if found { + _, err := time.Parse("2006:01:02:15:04:05.999999999", strings.Replace(stamp, ".", ":", 5)) + if err == nil { + backups = append(backups, entry.Name()) + } + } + } + archives := len(backups) + if archives > l.archiveLimit { + // Oldest to latest + sort.Slice(backups, func(i, j int) bool { + return backups[i] < backups[j] + }) + for i := 0; i < archives-l.archiveLimit; i++ { + if err := os.Remove(fmt.Sprintf("%s%s%s", lDir, string(os.PathSeparator), backups[i])); err != nil { + l.logDirect(l.l.errorLabel, "Unable to remove backup log file %q (%v), will attempt next rotation", backups[i], err) + // Bail fast, we'll try again next rotation + return + } + l.logDirect(l.l.infoLabel, "Removed archived log file %q", backups[i]) + } + } +} + func (l *fileLogger) Write(b []byte) (int, error) { if atomic.LoadInt32(&l.canRotate) == 0 { n, err := l.f.Write(b) @@ -225,6 +275,9 @@ func (l *fileLogger) Write(b []byte) (int, error) { n := l.logDirect(l.l.infoLabel, "Rotated log, backup saved as %q", bak) l.out = int64(n) l.limit = l.olimit + if l.archiveLimit > 0 { + l.archivePurge(fname) + } } } l.Unlock() @@ -257,6 +310,19 @@ func (l *Logger) SetSizeLimit(limit int64) error { return nil } +// SetArchiveLimit sets the number of archived log files that will be retained +func (l *Logger) SetArchiveLimit(limit int) error { + l.Lock() + if l.fl == nil { + l.Unlock() + return fmt.Errorf("can set log archive limit only for file logger") + } + fl := l.fl + l.Unlock() + fl.setArchiveLimit(limit) + return nil +} + // NewTestLogger creates a logger with output directed to Stderr with a prefix. // Useful for tracing in tests when multiple servers are in the same pid func NewTestLogger(prefix string, time bool) *Logger { diff --git a/server/log.go b/server/log.go index 60ca41c2fe..8ad511cc38 100644 --- a/server/log.go +++ b/server/log.go @@ -72,6 +72,16 @@ func (s *Server) ConfigureLogger() { l.SetSizeLimit(opts.LogSizeLimit) } } + if opts.LogMaxArchives > 0 { + if l, ok := log.(*srvlog.Logger); ok { + al := int(opts.LogMaxArchives) + if int64(al) != opts.LogMaxArchives { + // set to default (no max) on overflow + al = 0 + } + l.SetArchiveLimit(al) + } + } } else if opts.RemoteSyslog != "" { log = srvlog.NewRemoteSysLogger(opts.RemoteSyslog, opts.Debug, opts.Trace) } else if syslog { diff --git a/server/opts.go b/server/opts.go index f88d1c2917..7f9c32e46e 100644 --- a/server/opts.go +++ b/server/opts.go @@ -311,6 +311,7 @@ type Options struct { PortsFileDir string `json:"-"` LogFile string `json:"-"` LogSizeLimit int64 `json:"-"` + LogMaxArchives int64 `json:"-"` Syslog bool `json:"-"` RemoteSyslog string `json:"-"` Routes []*url.URL `json:"-"` @@ -999,6 +1000,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error o.LogFile = v.(string) case "logfile_size_limit", "log_size_limit": o.LogSizeLimit = v.(int64) + case "logfile_max_archives", "log_max_archives": + o.LogMaxArchives = v.(int64) case "syslog": o.Syslog = v.(bool) trackExplicitVal(o, &o.inConfig, "Syslog", o.Syslog) diff --git a/test/log_test.go b/test/log_test.go new file mode 100644 index 0000000000..fc2663dc6f --- /dev/null +++ b/test/log_test.go @@ -0,0 +1,129 @@ +package test + +import ( + "fmt" + "os" + "path/filepath" + "runtime" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" +) + +func RunServerWithLogging(opts *server.Options) *server.Server { + if opts == nil { + opts = &DefaultTestOptions + } + opts.NoLog = false + opts.Cluster.PoolSize = -1 + opts.Cluster.Compression.Mode = server.CompressionOff + opts.LeafNode.Compression.Mode = server.CompressionOff + s, err := server.NewServer(opts) + if err != nil || s == nil { + panic(fmt.Sprintf("No NATS Server object returned: %v", err)) + } + s.ConfigureLogger() + go s.Start() + if !s.ReadyForConnections(10 * time.Second) { + panic("Unable to start NATS Server in Go Routine") + } + return s +} + +func TestLogMaxArchives(t *testing.T) { + // With logfile_size_limit set to small 100 characters, plain startup rotates 8 times + for _, test := range []struct { + name string + config string + totEntriesExpected int + }{ + { + "Default implicit, no max archives, expect 0 purged archives", + ` + port: -1 + log_file: %s + logfile_size_limit: 100 + `, + 9, + }, + { + "Default explicit, no max archives, expect 0 purged archives", + ` + port: -1 + log_file: %s + logfile_size_limit: 100 + logfile_max_archives: 0 + `, + 9, + }, + { + "Default explicit - negative val, no max archives, expect 0 purged archives", + ` + port: -1 + log_file: %s + logfile_size_limit: 100 + logfile_max_archives: -42 + `, + 9, + }, + { + "1-archive limit, expect 7 purged archives", + ` + port: -1 + log_file: %s + logfile_size_limit: 100 + logfile_max_archives: 1 + `, + 2, + }, + { + "5-archive limit, expect 4 purged archives", + ` + port: -1 + log_file: %s + logfile_size_limit: 100 + logfile_max_archives: 5 + `, + 6, + }, + { + "100-archive limit, expect 0 purged archives", + ` + port: -1 + log_file: %s + logfile_size_limit: 100 + logfile_max_archives: 100 + `, + 9, + }, + } { + t.Run(test.name, func(t *testing.T) { + d, err := os.MkdirTemp("", "logtest") + if err != nil { + t.Fatalf("Error creating temp dir: %v", err) + } + content := fmt.Sprintf(test.config, filepath.Join(d, "nats-server.log")) + // server config does not like plain windows backslash + if runtime.GOOS == "windows" { + content = filepath.ToSlash(content) + } + opts, err := server.ProcessConfigFile(createConfFile(t, []byte(content))) + if err != nil { + t.Fatalf("Error processing config file: %v", err) + } + s := RunServerWithLogging(opts) + if s == nil { + t.Fatalf("No NATS Server object returned") + } + s.Shutdown() + entries, err := os.ReadDir(d) + if err != nil { + t.Fatalf("Error reading dir: %v", err) + } + if len(entries) != test.totEntriesExpected { + t.Fatalf("Expected %d log files, got %d", test.totEntriesExpected, len(entries)) + } + }) + } +} From 349e718d39f6d09220b25579e2631344a58292b2 Mon Sep 17 00:00:00 2001 From: Todd Beets Date: Fri, 15 Sep 2023 22:08:09 -0700 Subject: [PATCH 2/2] Changes for max log files option (active plus backups); remove redundant lexical sort of backups; adjust test --- logger/log.go | 56 +++++++++++++++++++++--------------------------- server/log.go | 8 +++---- server/opts.go | 6 +++--- test/log_test.go | 30 ++++++++++++++------------ 4 files changed, 48 insertions(+), 52 deletions(-) diff --git a/logger/log.go b/logger/log.go index 81d0c9c741..c90a50fd0b 100644 --- a/logger/log.go +++ b/logger/log.go @@ -19,7 +19,6 @@ import ( "log" "os" "path/filepath" - "sort" "strings" "sync" "sync/atomic" @@ -131,14 +130,14 @@ type fileLogger struct { out int64 canRotate int32 sync.Mutex - l *Logger - f writerAndCloser - limit int64 - olimit int64 - pid string - time bool - closed bool - archiveLimit int + l *Logger + f writerAndCloser + limit int64 + olimit int64 + pid string + time bool + closed bool + maxNumFiles int } func newFileLogger(filename, pidPrefix string, time bool) (*fileLogger, error) { @@ -173,9 +172,9 @@ func (l *fileLogger) setLimit(limit int64) { } } -func (l *fileLogger) setArchiveLimit(limit int) { +func (l *fileLogger) setMaxNumFiles(max int) { l.Lock() - l.archiveLimit = limit + l.maxNumFiles = max l.Unlock() } @@ -200,9 +199,7 @@ func (l *fileLogger) logDirect(label, format string, v ...interface{}) int { return len(entry) } -func (l *fileLogger) archivePurge(fname string) { - // Evaluate number of saved backups for purge - // l readlock held +func (l *fileLogger) logPurge(fname string) { var backups []string lDir := filepath.Dir(fname) lBase := filepath.Base(fname) @@ -215,27 +212,24 @@ func (l *fileLogger) archivePurge(fname string) { if entry.IsDir() || entry.Name() == lBase || !strings.HasPrefix(entry.Name(), lBase) { continue } - stamp, found := strings.CutPrefix(entry.Name(), fmt.Sprintf("%s%s", lBase, ".")) - if found { + if stamp, found := strings.CutPrefix(entry.Name(), fmt.Sprintf("%s%s", lBase, ".")); found { _, err := time.Parse("2006:01:02:15:04:05.999999999", strings.Replace(stamp, ".", ":", 5)) if err == nil { backups = append(backups, entry.Name()) } } } - archives := len(backups) - if archives > l.archiveLimit { - // Oldest to latest - sort.Slice(backups, func(i, j int) bool { - return backups[i] < backups[j] - }) - for i := 0; i < archives-l.archiveLimit; i++ { - if err := os.Remove(fmt.Sprintf("%s%s%s", lDir, string(os.PathSeparator), backups[i])); err != nil { + currBackups := len(backups) + maxBackups := l.maxNumFiles - 1 + if currBackups > maxBackups { + // backups sorted oldest to latest based on timestamped lexical filename (ReadDir) + for i := 0; i < currBackups-maxBackups; i++ { + if err := os.Remove(filepath.Join(lDir, string(os.PathSeparator), backups[i])); err != nil { l.logDirect(l.l.errorLabel, "Unable to remove backup log file %q (%v), will attempt next rotation", backups[i], err) // Bail fast, we'll try again next rotation return } - l.logDirect(l.l.infoLabel, "Removed archived log file %q", backups[i]) + l.logDirect(l.l.infoLabel, "Purged log file %q", backups[i]) } } } @@ -275,8 +269,8 @@ func (l *fileLogger) Write(b []byte) (int, error) { n := l.logDirect(l.l.infoLabel, "Rotated log, backup saved as %q", bak) l.out = int64(n) l.limit = l.olimit - if l.archiveLimit > 0 { - l.archivePurge(fname) + if l.maxNumFiles > 0 { + l.logPurge(fname) } } } @@ -310,16 +304,16 @@ func (l *Logger) SetSizeLimit(limit int64) error { return nil } -// SetArchiveLimit sets the number of archived log files that will be retained -func (l *Logger) SetArchiveLimit(limit int) error { +// SetMaxNumFiles sets the number of archived log files that will be retained +func (l *Logger) SetMaxNumFiles(max int) error { l.Lock() if l.fl == nil { l.Unlock() - return fmt.Errorf("can set log archive limit only for file logger") + return fmt.Errorf("can set log max number of files only for file logger") } fl := l.fl l.Unlock() - fl.setArchiveLimit(limit) + fl.setMaxNumFiles(max) return nil } diff --git a/server/log.go b/server/log.go index 8ad511cc38..fdd8d85c2b 100644 --- a/server/log.go +++ b/server/log.go @@ -72,14 +72,14 @@ func (s *Server) ConfigureLogger() { l.SetSizeLimit(opts.LogSizeLimit) } } - if opts.LogMaxArchives > 0 { + if opts.LogMaxFiles > 0 { if l, ok := log.(*srvlog.Logger); ok { - al := int(opts.LogMaxArchives) - if int64(al) != opts.LogMaxArchives { + al := int(opts.LogMaxFiles) + if int64(al) != opts.LogMaxFiles { // set to default (no max) on overflow al = 0 } - l.SetArchiveLimit(al) + l.SetMaxNumFiles(al) } } } else if opts.RemoteSyslog != "" { diff --git a/server/opts.go b/server/opts.go index 7f9c32e46e..508587bb4c 100644 --- a/server/opts.go +++ b/server/opts.go @@ -311,7 +311,7 @@ type Options struct { PortsFileDir string `json:"-"` LogFile string `json:"-"` LogSizeLimit int64 `json:"-"` - LogMaxArchives int64 `json:"-"` + LogMaxFiles int64 `json:"-"` Syslog bool `json:"-"` RemoteSyslog string `json:"-"` Routes []*url.URL `json:"-"` @@ -1000,8 +1000,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error o.LogFile = v.(string) case "logfile_size_limit", "log_size_limit": o.LogSizeLimit = v.(int64) - case "logfile_max_archives", "log_max_archives": - o.LogMaxArchives = v.(int64) + case "logfile_max_num", "log_max_num": + o.LogMaxFiles = v.(int64) case "syslog": o.Syslog = v.(bool) trackExplicitVal(o, &o.inConfig, "Syslog", o.Syslog) diff --git a/test/log_test.go b/test/log_test.go index fc2663dc6f..69ce112744 100644 --- a/test/log_test.go +++ b/test/log_test.go @@ -39,7 +39,7 @@ func TestLogMaxArchives(t *testing.T) { totEntriesExpected int }{ { - "Default implicit, no max archives, expect 0 purged archives", + "Default implicit, no max logs, expect 0 purged logs", ` port: -1 log_file: %s @@ -48,52 +48,52 @@ func TestLogMaxArchives(t *testing.T) { 9, }, { - "Default explicit, no max archives, expect 0 purged archives", + "Default explicit, no max logs, expect 0 purged logs", ` port: -1 log_file: %s logfile_size_limit: 100 - logfile_max_archives: 0 + logfile_max_num: 0 `, 9, }, { - "Default explicit - negative val, no max archives, expect 0 purged archives", + "Default explicit - negative val, no max logs, expect 0 purged logs", ` port: -1 log_file: %s logfile_size_limit: 100 - logfile_max_archives: -42 + logfile_max_num: -42 `, 9, }, { - "1-archive limit, expect 7 purged archives", + "1-max num, expect 8 purged logs", ` port: -1 log_file: %s logfile_size_limit: 100 - logfile_max_archives: 1 + logfile_max_num: 1 `, - 2, + 1, }, { - "5-archive limit, expect 4 purged archives", + "5-max num, expect 4 purged logs; use opt alias", ` port: -1 log_file: %s - logfile_size_limit: 100 - logfile_max_archives: 5 + log_size_limit: 100 + log_max_num: 5 `, - 6, + 5, }, { - "100-archive limit, expect 0 purged archives", + "100-max num, expect 0 purged logs", ` port: -1 log_file: %s logfile_size_limit: 100 - logfile_max_archives: 100 + logfile_max_num: 100 `, 9, }, @@ -117,6 +117,8 @@ func TestLogMaxArchives(t *testing.T) { t.Fatalf("No NATS Server object returned") } s.Shutdown() + // Windows filesystem can be a little pokey on the flush, so wait a bit after shutdown... + time.Sleep(500 * time.Millisecond) entries, err := os.ReadDir(d) if err != nil { t.Fatalf("Error reading dir: %v", err)