Skip to content

Commit

Permalink
Fix race
Browse files Browse the repository at this point in the history
  • Loading branch information
zix99 committed Jun 17, 2022
1 parent 7ed831d commit 12c62f1
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pkg/extractor/batchers/batcher.go
Expand Up @@ -93,6 +93,12 @@ func (s *Batcher) ReadErrors() int {
return s.errorCount
}

func (s *Batcher) ActiveFileCount() int {
s.mux.Lock()
defer s.mux.Unlock()
return len(s.activeFiles)
}

// StatusString gets a formatted version of the current reader-set
func (s *Batcher) StatusString() string {
var sb strings.Builder
Expand Down
3 changes: 3 additions & 0 deletions pkg/extractor/batchers/batcher_test.go
Expand Up @@ -17,12 +17,14 @@ func TestBatcherTracking(t *testing.T) {

s.startFileReading("abc")
assert.Contains(t, s.StatusString(), "0/5")
assert.Equal(t, s.ActiveFileCount(), 1)

s.stopFileReading("abc")
assert.Equal(t, 5, s.sourceCount)
assert.Equal(t, 1, s.readCount)
assert.Equal(t, 0, s.ReadErrors())
assert.Contains(t, s.StatusString(), "1/5")
assert.Equal(t, s.ActiveFileCount(), 0)
}

func TestReaderToBatcher(t *testing.T) {
Expand All @@ -41,6 +43,7 @@ line3`
assert.Len(t, b2.Batch, 1)
assert.Equal(t, s.errorCount, 0)
assert.Equal(t, s.ReadBytes(), uint64(17))
assert.Equal(t, s.ActiveFileCount(), 0)
}

func TestBatcherWithAutoFlush(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/extractor/batchers/tailBatcher_test.go
Expand Up @@ -40,7 +40,7 @@ func TestBatchFollowTailFile(t *testing.T) {

batcher := TailFilesToChan(filenames, 1, false, false, true)

for len(batcher.activeFiles) == 0 {
for batcher.ActiveFileCount() == 0 {
time.Sleep(time.Millisecond) // Semi-hack: Wait for the go-routine reader to start and the source to be drained
}

Expand Down

0 comments on commit 12c62f1

Please sign in to comment.