diff --git a/internal/store/indexer.go b/internal/store/indexer.go new file mode 100644 index 00000000..e13bb6da --- /dev/null +++ b/internal/store/indexer.go @@ -0,0 +1,155 @@ +package store + +import ( + "errors" + "sync" +) + +var ( + // ErrLogUnmatch is an error causes when the log index can't be used because log file has updated. + ErrLogUnmatch = errors.New("error: log is unmatch to the index") +) + +// indexPeriod is a entry of [indexer]. +type indexPeriod struct { + Start int64 // Start position in the log file in bytes. + End int64 // End position in the log file in bytes. + Since int64 // Minimal timestamp in UNIX time that included in this period. + Until int64 // Maximum timestamp in UNIX time that included in this period. + Size int64 // Number of log records in this period. +} + +// indexer is the index for make faster to read log file. +type indexer struct { + sync.Mutex + + periods []indexPeriod + interval int64 +} + +// newIndexer creates a new [indexer]. +func newIndexer() *indexer { + return &indexer{ + periods: make([]indexPeriod, 1), + interval: 10000, + } +} + +// AppendEntry stores new entry to the [indexer]. +// +// `start` is the head position of record in log file. +// `end` is the tail position of record in log file. +// `time` is the UNIX time of the log entry. +// +// The `start` should equal to the previous entry's `end`. Otherwise, this method returns error because log file could has been updated or rotated. +func (idx *indexer) AppendEntry(start, end, time int64) error { + idx.Lock() + defer idx.Unlock() + + return idx.AppendEntryWithoutLock(start, end, time) +} + +// AppendEntryWithoutLock stores new entry to the [indexer] without lock mutex. +// +// The arguments are the same as AppendEntryWithoutLock. +func (idx *indexer) AppendEntryWithoutLock(start, end, time int64) error { + i := len(idx.periods) - 1 + + if idx.periods[i].End != start { + return ErrLogUnmatch + } + + if idx.periods[i].Size == 0 { + idx.periods[i].Start = start + idx.periods[i].End = end + idx.periods[i].Since = time + idx.periods[i].Until = time + idx.periods[i].Size = 1 + } else if idx.periods[i].Size < idx.interval { + if idx.periods[i].Since > time { + idx.periods[i].Since = time + } + if idx.periods[i].Until < time { + idx.periods[i].Until = time + } + idx.periods[i].Size++ + idx.periods[i].End = end + } else { + idx.periods = append(idx.periods, indexPeriod{ + Start: start, + End: end, + Since: time, + Until: time, + Size: 1, + }) + } + + return nil +} + +// AppendInvalidRangeWithoutLock records a range of log file that doesn't contain valid log entry, without lock mutex. +// +// `start` and `end` of arguments are the same meaning to [indexer.AppendEntry]. +func (idx *indexer) AppendInvalidRangeWithoutLock(start, end int64) error { + i := len(idx.periods) - 1 + + if idx.periods[i].End != start { + return ErrLogUnmatch + } + + if idx.periods[i].Size == 0 { + idx.periods[i].End = end + } else { + idx.periods = append(idx.periods, indexPeriod{ + Start: start, + End: end, + }) + } + + return nil +} + +// Search picks up the ranges in log file that includes specified period by arguments. +func (idx *indexer) Search(since, until int64) []logRange { + idx.Lock() + defer idx.Unlock() + + var ranges []logRange + + for _, x := range idx.periods { + if x.Since <= until && since <= x.Until && x.Size != 0 { + if len(ranges) == 0 || ranges[len(ranges)-1].End != x.Start { + ranges = append(ranges, logRange{ + Start: x.Start, + End: x.End, + Size: x.Size, + }) + } else { + ranges[len(ranges)-1].End = x.End + ranges[len(ranges)-1].Size += x.Size + } + } + } + + return ranges +} + +// logRange is a range in log file. +type logRange struct { + Start int64 // Start position in bytes. + End int64 // End position in bytes. + Size int64 // Number of included log entries. +} + +// Reset resets indexer. +func (idx *indexer) Reset() { + idx.Lock() + defer idx.Unlock() + + idx.ResetWithoutLock() +} + +// ResetWithoutLock resets indexer without lock mutex. +func (idx *indexer) ResetWithoutLock() { + idx.periods = make([]indexPeriod, 1) +} diff --git a/internal/store/indexer_test.go b/internal/store/indexer_test.go new file mode 100644 index 00000000..7c864354 --- /dev/null +++ b/internal/store/indexer_test.go @@ -0,0 +1,79 @@ +package store + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func Test_indexer(t *testing.T) { + idx := newIndexer() + idx.interval = 3 + + if err := idx.AppendEntry(0, 100, 1); err != nil { + t.Fatalf("failed to append log entry: %s", err) + } + + if err := idx.AppendEntry(0, 50, 2); err != ErrLogUnmatch { + t.Fatalf("append log entry should return ErrLogUnmatch but got: %s", err) + } + + for i := int64(2); i <= 4; i++ { + if err := idx.AppendEntry(i*50, i*50+50, i); err != nil { + t.Fatalf("failed to append log entry[%d]: %s", i, err) + } + } + + if err := idx.AppendEntry(250, 300, 10); err != nil { + t.Fatalf("failed to append log entry: %s", err) + } + + for i := int64(6); i <= 7; i++ { + if err := idx.AppendEntry(i*50, i*50+50, i); err != nil { + t.Fatalf("failed to append log entry[%d]: %s", i, err) + } + } + + idx.Lock() + if err := idx.AppendInvalidRangeWithoutLock(400, 500); err != nil { + t.Fatalf("failed to append log entry: %s", err) + } + idx.Unlock() + + if err := idx.AppendEntry(500, 550, 2); err != nil { + t.Fatalf("failed to append log entry: %s", err) + } + + // So far, the entries looks like this. + // + // time start end period + // 1 0 100 0 + // 2 100 150 0 + // 3 150 200 0 + // 4 200 250 1 + // 10 250 300 1 + // 6 300 350 1 + // 7 350 400 2 + // - 400 500 - + // 2 500 550 2 + + tests := []struct { + Since int64 + Until int64 + Want []logRange + }{ + {0, 0, nil}, + {0, 1, []logRange{{0, 200, 3}}}, + {2, 3, []logRange{{0, 200, 3}, {500, 550, 1}}}, + {1, 4, []logRange{{0, 350, 6}, {500, 550, 1}}}, + {5, 8, []logRange{{200, 400, 4}}}, + {10, 20, []logRange{{200, 350, 3}}}, + {11, 20, nil}, + } + + for _, tt := range tests { + if diff := cmp.Diff(tt.Want, idx.Search(tt.Since, tt.Until)); diff != "" { + t.Errorf("unexpected range: interested period is %d-%d\n%s", tt.Since, tt.Until, diff) + } + } +} diff --git a/internal/store/scanner.go b/internal/store/scanner.go index ee942e10..1f427b47 100644 --- a/internal/store/scanner.go +++ b/internal/store/scanner.go @@ -1,6 +1,7 @@ package store import ( + "bufio" "os" "sort" "time" @@ -8,12 +9,79 @@ import ( api "github.com/macrat/ayd/lib-ayd" ) -func newFileScanner(path string, since, until time.Time) (api.LogScanner, error) { +type fileScanner struct { + file *os.File + reader *bufio.Reader + since time.Time + until time.Time + rec api.Record + interests []logRange + pos int64 +} + +// newFileScanner creates a new [fileScanner] from file path, with period specification. +func newFileScanner(path string, since, until time.Time, interests []logRange) (*fileScanner, error) { f, err := os.OpenFile(path, os.O_RDONLY, 0644) if err != nil { return nil, err } - return api.NewLogScannerWithPeriod(f, since, until), nil + return &fileScanner{ + file: f, + reader: bufio.NewReader(f), + since: since, + until: until, + interests: interests, + }, nil +} + +func (r *fileScanner) Close() error { + return r.file.Close() +} + +func (r *fileScanner) seek(pos int64) { + r.file.Seek(pos, os.SEEK_SET) + r.reader = bufio.NewReader(r.file) + r.pos = pos +} + +func (r *fileScanner) Scan() bool { + if len(r.interests) == 0 { + return false + } + + if r.pos < r.interests[0].Start { + r.seek(r.interests[0].Start) + } + + for { + b, err := r.reader.ReadBytes('\n') + if err != nil { + return false + } + r.pos += int64(len(b)) + + var rec api.Record + err = rec.UnmarshalJSON(b) + if err == nil && !rec.Time.Before(r.since) && r.until.After(rec.Time) { + r.rec = rec + return true + } + + if r.pos > r.interests[0].End { + r.interests = r.interests[1:] + if len(r.interests) == 0 { + return false + } + r.seek(r.interests[0].Start) + } + + continue + } + return false +} + +func (r *fileScanner) Record() api.Record { + return r.rec } type inMemoryScanner struct { @@ -69,5 +137,7 @@ func (s *Store) OpenLog(since, until time.Time) (api.LogScanner, error) { if s.Path() == "" { return newInMemoryScanner(s, since, until), nil } - return newFileScanner(s.Path(), since, until) + + interests := s.index.Search(since.Unix(), until.Unix()) + return newFileScanner(s.Path(), since, until, interests) } diff --git a/internal/store/store.go b/internal/store/store.go index b749ebeb..36acf7e5 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -20,14 +20,15 @@ const ( ) var ( - LogRestoreBytes int64 = 100 * 1024 * 1024 + LogRestoreBytes int64 = 1024 * 1024 * 1024 ) type RecordHandler func(api.Record) // Store is the log handler of Ayd, and it also the database of Ayd. type Store struct { - path string + path string + fileIndex int64 Console io.Writer @@ -35,6 +36,7 @@ type Store struct { probeHistory probeHistoryMap currentIncidents map[string]*api.Incident incidentHistory []*api.Incident + index *indexer OnStatusChanged []RecordHandler incidentCount int @@ -54,6 +56,7 @@ func New(path string, console io.Writer) (*Store, error) { Console: console, probeHistory: make(probeHistoryMap), currentIncidents: make(map[string]*api.Incident), + index: newIndexer(), writeCh: ch, writerStopped: make(chan struct{}), healthy: true, @@ -140,6 +143,14 @@ func (s *Store) writer(ch <-chan api.Record, stopped chan struct{}) { err = f.Close() s.handleError(err, "failed to close log file") + + if s.index.AppendEntry(s.fileIndex, s.fileIndex+reader.Size(), r.Time.Unix()) == nil { + s.fileIndex += reader.Size() + } else { + s.index.Reset() + s.fileIndex = reader.Size() + s.index.AppendEntry(0, s.fileIndex, r.Time.Unix()) + } } close(stopped) @@ -296,6 +307,9 @@ func (s *Store) Restore() error { s.historyLock.Lock() defer s.historyLock.Unlock() + s.index.Lock() + defer s.index.Unlock() + f, err := os.OpenFile(s.path, os.O_RDONLY|os.O_CREATE, 0644) if err != nil { return err @@ -303,26 +317,49 @@ func (s *Store) Restore() error { defer f.Close() if ret, _ := f.Seek(-LogRestoreBytes, os.SEEK_END); ret != 0 { u := &api.URL{Scheme: "ayd", Opaque: "log"} - s.Report(u, api.Record{ + fmt.Fprintln(s.Console, api.Record{ Time: time.Now(), Status: api.StatusDegrade, Target: u, - Message: "WARNING: read only last 100MB from log file because it is too large", + Message: "WARNING: read only last 1GB from log file because it is too large", Extra: map[string]interface{}{ "log_size": ret + LogRestoreBytes, }, }) + } else if info, err := f.Stat(); err == nil && info.Size() > 10*1024*1024 { + u := &api.URL{Scheme: "ayd", Opaque: "log"} + fmt.Fprintln(s.Console, api.Record{ + Time: time.Now(), + Status: api.StatusHealthy, + Target: u, + Message: "WARNING: loading large log file", + Extra: map[string]interface{}{ + "log_size": info.Size(), + }, + }) } s.probeHistory = make(probeHistoryMap) + s.fileIndex = 0 + s.index.ResetWithoutLock() - scanner := bufio.NewScanner(f) - for scanner.Scan() { - r, err := api.ParseRecord(scanner.Text()) + reader := bufio.NewReader(f) + for { + line, err := reader.ReadBytes('\n') if err != nil { + break + } + l := int64(len(line)) + s.fileIndex += l + + var r api.Record + if err = r.UnmarshalJSON(line); err != nil { + s.index.AppendInvalidRangeWithoutLock(s.fileIndex-l, s.fileIndex) continue } + s.index.AppendEntryWithoutLock(s.fileIndex-l, s.fileIndex, r.Time.Unix()) + if _, ok := r.Target.User.Password(); ok { r.Target.User = url.UserPassword(r.Target.User.Username(), "xxxxx") } @@ -435,3 +472,8 @@ func (s *Store) MakeReport(probeHistoryLength int) api.Report { return report } + +// SetIndexInterval sets indexing interval for debug. +func (s *Store) SetIndexInterval(interval int64) { + s.index.interval = interval +} diff --git a/internal/testutil/store.go b/internal/testutil/store.go index 93b8e213..050bfc08 100644 --- a/internal/testutil/store.go +++ b/internal/testutil/store.go @@ -19,6 +19,8 @@ func NewStoreWithConsole(t testing.TB, w io.Writer) *store.Store { t.Fatalf("failed to create store: %s", err) } + s.SetIndexInterval(3) + return s } @@ -43,6 +45,8 @@ func NewStoreWithLog(t testing.TB) *store.Store { t.Fatalf("failed to create store: %s", err) } + s.SetIndexInterval(3) + if err = s.Restore(); err != nil { t.Fatalf("failed to restore store: %s", err) }