Skip to content

Commit

Permalink
feat: make in memory log file index
Browse files Browse the repository at this point in the history
  • Loading branch information
macrat committed Oct 2, 2022
1 parent b591c71 commit 2dcaff2
Show file tree
Hide file tree
Showing 5 changed files with 360 additions and 10 deletions.
155 changes: 155 additions & 0 deletions internal/store/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package store

import (
"errors"
"sync"
)

var (
// ErrLogUnmatch is an error causes when the log index can't be used because log file has updated.
ErrLogUnmatch = errors.New("error: log is unmatch to the index")
)

// indexPeriod is a entry of [indexer].
type indexPeriod struct {
Start int64 // Start position in the log file in bytes.
End int64 // End position in the log file in bytes.
Since int64 // Minimal timestamp in UNIX time that included in this period.
Until int64 // Maximum timestamp in UNIX time that included in this period.
Size int64 // Number of log records in this period.
}

// indexer is the index for make faster to read log file.
type indexer struct {
sync.Mutex

periods []indexPeriod
interval int64
}

// newIndexer creates a new [indexer].
func newIndexer() *indexer {
return &indexer{
periods: make([]indexPeriod, 1),
interval: 10000,
}
}

// AppendEntry stores new entry to the [indexer].
//
// `start` is the head position of record in log file.
// `end` is the tail position of record in log file.
// `time` is the UNIX time of the log entry.
//
// The `start` should equal to the previous entry's `end`. Otherwise, this method returns error because log file could has been updated or rotated.
func (idx *indexer) AppendEntry(start, end, time int64) error {
idx.Lock()
defer idx.Unlock()

return idx.AppendEntryWithoutLock(start, end, time)
}

// AppendEntryWithoutLock stores new entry to the [indexer] without lock mutex.
//
// The arguments are the same as AppendEntryWithoutLock.
func (idx *indexer) AppendEntryWithoutLock(start, end, time int64) error {
i := len(idx.periods) - 1

if idx.periods[i].End != start {
return ErrLogUnmatch
}

if idx.periods[i].Size == 0 {
idx.periods[i].Start = start
idx.periods[i].End = end
idx.periods[i].Since = time
idx.periods[i].Until = time
idx.periods[i].Size = 1
} else if idx.periods[i].Size < idx.interval {
if idx.periods[i].Since > time {
idx.periods[i].Since = time
}
if idx.periods[i].Until < time {
idx.periods[i].Until = time
}
idx.periods[i].Size++
idx.periods[i].End = end
} else {
idx.periods = append(idx.periods, indexPeriod{
Start: start,
End: end,
Since: time,
Until: time,
Size: 1,
})
}

return nil
}

// AppendInvalidRangeWithoutLock records a range of log file that doesn't contain valid log entry, without lock mutex.
//
// `start` and `end` of arguments are the same meaning to [indexer.AppendEntry].
func (idx *indexer) AppendInvalidRangeWithoutLock(start, end int64) error {
i := len(idx.periods) - 1

if idx.periods[i].End != start {
return ErrLogUnmatch
}

if idx.periods[i].Size == 0 {
idx.periods[i].End = end
} else {
idx.periods = append(idx.periods, indexPeriod{
Start: start,
End: end,
})
}

return nil
}

// Search picks up the ranges in log file that includes specified period by arguments.
func (idx *indexer) Search(since, until int64) []logRange {
idx.Lock()
defer idx.Unlock()

var ranges []logRange

for _, x := range idx.periods {
if x.Since <= until && since <= x.Until && x.Size != 0 {
if len(ranges) == 0 || ranges[len(ranges)-1].End != x.Start {
ranges = append(ranges, logRange{
Start: x.Start,
End: x.End,
Size: x.Size,
})
} else {
ranges[len(ranges)-1].End = x.End
ranges[len(ranges)-1].Size += x.Size
}
}
}

return ranges
}

// logRange is a range in log file.
type logRange struct {
Start int64 // Start position in bytes.
End int64 // End position in bytes.
Size int64 // Number of included log entries.
}

// Reset resets indexer.
func (idx *indexer) Reset() {
idx.Lock()
defer idx.Unlock()

idx.ResetWithoutLock()
}

// ResetWithoutLock resets indexer without lock mutex.
func (idx *indexer) ResetWithoutLock() {
idx.periods = make([]indexPeriod, 1)
}
79 changes: 79 additions & 0 deletions internal/store/indexer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package store

import (
"testing"

"github.com/google/go-cmp/cmp"
)

func Test_indexer(t *testing.T) {
idx := newIndexer()
idx.interval = 3

if err := idx.AppendEntry(0, 100, 1); err != nil {
t.Fatalf("failed to append log entry: %s", err)
}

if err := idx.AppendEntry(0, 50, 2); err != ErrLogUnmatch {
t.Fatalf("append log entry should return ErrLogUnmatch but got: %s", err)
}

for i := int64(2); i <= 4; i++ {
if err := idx.AppendEntry(i*50, i*50+50, i); err != nil {
t.Fatalf("failed to append log entry[%d]: %s", i, err)
}
}

if err := idx.AppendEntry(250, 300, 10); err != nil {
t.Fatalf("failed to append log entry: %s", err)
}

for i := int64(6); i <= 7; i++ {
if err := idx.AppendEntry(i*50, i*50+50, i); err != nil {
t.Fatalf("failed to append log entry[%d]: %s", i, err)
}
}

idx.Lock()
if err := idx.AppendInvalidRangeWithoutLock(400, 500); err != nil {
t.Fatalf("failed to append log entry: %s", err)
}
idx.Unlock()

if err := idx.AppendEntry(500, 550, 2); err != nil {
t.Fatalf("failed to append log entry: %s", err)
}

// So far, the entries looks like this.
//
// time start end period
// 1 0 100 0
// 2 100 150 0
// 3 150 200 0
// 4 200 250 1
// 10 250 300 1
// 6 300 350 1
// 7 350 400 2
// - 400 500 -
// 2 500 550 2

tests := []struct {
Since int64
Until int64
Want []logRange
}{
{0, 0, nil},
{0, 1, []logRange{{0, 200, 3}}},
{2, 3, []logRange{{0, 200, 3}, {500, 550, 1}}},
{1, 4, []logRange{{0, 350, 6}, {500, 550, 1}}},
{5, 8, []logRange{{200, 400, 4}}},
{10, 20, []logRange{{200, 350, 3}}},
{11, 20, nil},
}

for _, tt := range tests {
if diff := cmp.Diff(tt.Want, idx.Search(tt.Since, tt.Until)); diff != "" {
t.Errorf("unexpected range: interested period is %d-%d\n%s", tt.Since, tt.Until, diff)
}
}
}
76 changes: 73 additions & 3 deletions internal/store/scanner.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,87 @@
package store

import (
"bufio"
"os"
"sort"
"time"

api "github.com/macrat/ayd/lib-ayd"
)

func newFileScanner(path string, since, until time.Time) (api.LogScanner, error) {
type fileScanner struct {
file *os.File
reader *bufio.Reader
since time.Time
until time.Time
rec api.Record
interests []logRange
pos int64
}

// newFileScanner creates a new [fileScanner] from file path, with period specification.
func newFileScanner(path string, since, until time.Time, interests []logRange) (*fileScanner, error) {
f, err := os.OpenFile(path, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
return api.NewLogScannerWithPeriod(f, since, until), nil
return &fileScanner{
file: f,
reader: bufio.NewReader(f),
since: since,
until: until,
interests: interests,
}, nil
}

func (r *fileScanner) Close() error {
return r.file.Close()
}

func (r *fileScanner) seek(pos int64) {
r.file.Seek(pos, os.SEEK_SET)
r.reader = bufio.NewReader(r.file)
r.pos = pos
}

func (r *fileScanner) Scan() bool {
if len(r.interests) == 0 {
return false
}

if r.pos < r.interests[0].Start {
r.seek(r.interests[0].Start)
}

for {
b, err := r.reader.ReadBytes('\n')
if err != nil {
return false
}
r.pos += int64(len(b))

var rec api.Record
err = rec.UnmarshalJSON(b)
if err == nil && !rec.Time.Before(r.since) && r.until.After(rec.Time) {
r.rec = rec
return true
}

if r.pos > r.interests[0].End {
r.interests = r.interests[1:]
if len(r.interests) == 0 {
return false
}
r.seek(r.interests[0].Start)
}

continue
}
return false
}

func (r *fileScanner) Record() api.Record {
return r.rec
}

type inMemoryScanner struct {
Expand Down Expand Up @@ -69,5 +137,7 @@ func (s *Store) OpenLog(since, until time.Time) (api.LogScanner, error) {
if s.Path() == "" {
return newInMemoryScanner(s, since, until), nil
}
return newFileScanner(s.Path(), since, until)

interests := s.index.Search(since.Unix(), until.Unix())
return newFileScanner(s.Path(), since, until, interests)
}
Loading

0 comments on commit 2dcaff2

Please sign in to comment.