Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize memory usage by reducing buffered batches by default #82

Merged
merged 3 commits into from
Sep 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ jobs:
run: |
go run honnef.co/go/tools/cmd/staticcheck ./...
- name: Codecov
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
run: |
bash <(curl -s https://codecov.io/bash)
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v1
with:
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ Available tags:

* `experimental` Enable experimental features (eg. fuzzy search)
* `pcre2` Enables PCRE 2 (v10) where able. Currently linux only
* `rare_no_pprof` Disables profiling capabilities, which reduces binary size
* `urfave_cli_no_docs` Disables man and markdown documentation generation, which reduces binary size

**A Note on PCRE (Perl Compatible Regex Library)**

Expand Down
17 changes: 13 additions & 4 deletions cmd/helpers/extractorBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func BuildBatcherFromArguments(c *cli.Context) *batchers.Batcher {
concurrentReaders = c.Int("readers")
gunzip = c.Bool("gunzip")
batchSize = c.Int("batch")
batchBuffer = c.Int("batch-buffer")
recursive = c.Bool("recursive")
)

Expand All @@ -55,14 +56,14 @@ func BuildBatcherFromArguments(c *cli.Context) *batchers.Batcher {
if follow {
logger.Println("Cannot follow a stdin stream, not a file")
}
return batchers.OpenReaderToChan("<stdin>", os.Stdin, batchSize)
return batchers.OpenReaderToChan("<stdin>", os.Stdin, batchSize, batchBuffer)
} else if follow { // Read from source file
if gunzip {
logger.Println("Cannot combine -f and -z")
}
return batchers.TailFilesToChan(dirwalk.GlobExpand(fileglobs, recursive), batchSize, followReopen, followPoll, followTail)
return batchers.TailFilesToChan(dirwalk.GlobExpand(fileglobs, recursive), batchSize, batchBuffer, followReopen, followPoll, followTail)
} else { // Read (no-follow) source file(s)
return batchers.OpenFilesToChan(dirwalk.GlobExpand(fileglobs, recursive), gunzip, concurrentReaders, batchSize)
return batchers.OpenFilesToChan(dirwalk.GlobExpand(fileglobs, recursive), gunzip, concurrentReaders, batchSize, batchBuffer)
}
}

Expand Down Expand Up @@ -99,6 +100,8 @@ func BuildExtractorFromArgumentsEx(c *cli.Context, batcher *batchers.Batcher, se
}

func getExtractorFlags() []cli.Flag {
workerCount := runtime.NumCPU()/2 + 1

return []cli.Flag{
&cli.BoolFlag{
Name: "follow",
Expand Down Expand Up @@ -173,12 +176,18 @@ func getExtractorFlags() []cli.Flag {
Usage: "Specifies io batching size. Set to 1 for immediate input",
Value: 1000,
},
&cli.IntFlag{
Name: "batch-buffer",
Category: cliCategoryTweaking,
Usage: "Specifies how many batches to read-ahead. Impacts memory usage, can improve performance",
Value: workerCount * 2, // Keep 2 batches ready for each worker
},
&cli.IntFlag{
Name: "workers",
Aliases: []string{"w"},
Category: cliCategoryTweaking,
Usage: "Set number of data processors",
Value: runtime.NumCPU()/2 + 1,
Value: workerCount,
},
&cli.IntFlag{
Name: "readers",
Expand Down
2 changes: 1 addition & 1 deletion cmd/helpers/updatingAggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *VirtualAggregator) ParseErrors() uint64 {

func TestAggregationLoop(t *testing.T) {
// Build a real extractor
batcher := batchers.OpenReaderToChan("test", ioutil.NopCloser(strings.NewReader(testData)), 1)
batcher := batchers.OpenReaderToChan("test", ioutil.NopCloser(strings.NewReader(testData)), 1, 1)
ex, err := extractor.New(batcher.BatchChan(), &extractor.Config{
Regex: `(\d+)`,
Extract: "val:{1}",
Expand Down
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ Available tags:

* `experimental` Enable experimental features (eg. fuzzy search)
* `pcre2` Enables PCRE 2 (v10) where able. Currently linux only
* `rare_no_pprof` Disables profiling capabilities, which reduces binary size
* `urfave_cli_no_docs` Disables man and markdown documentation generation, which reduces binary size


## Quickstart
Expand Down
4 changes: 4 additions & 0 deletions docs/usage/input.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,7 @@ that the batch will be processed irregardless of its current size.
You can tweak this value with `--batch`

`rare <aggreagator> --batch=10 ...`

In addition, you can tweak how many batches are buffered for the extractor with `--batch-buffer`.
By default, it will buffer 2 batches for every worker. More buffers take more memory, and
may slightly improve performance or handle intermittent IO better.
4 changes: 2 additions & 2 deletions pkg/extractor/batchers/fileBatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

// openFilesToChan takes an iterated channel of filenames, options, and loads them all with
// a max concurrency. Returns a channel that will populate with input batches
func OpenFilesToChan(filenames <-chan string, gunzip bool, concurrency int, batchSize int) *Batcher {
out := newBatcher(128)
func OpenFilesToChan(filenames <-chan string, gunzip bool, concurrency int, batchSize, batchBuffer int) *Batcher {
out := newBatcher(batchBuffer)
sema := make(chan struct{}, concurrency)

// Load as many files as the sema allows
Expand Down
2 changes: 1 addition & 1 deletion pkg/extractor/batchers/fileBatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestOpenFilesToChan(t *testing.T) {
filenames <- "fileBatcher_test.go" // me!
close(filenames)

batches := OpenFilesToChan(filenames, false, 1, 1)
batches := OpenFilesToChan(filenames, false, 1, 1, 1)

total := 0
var lastStart uint64 = 0
Expand Down
4 changes: 2 additions & 2 deletions pkg/extractor/batchers/readerBatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"io"
)

func OpenReaderToChan(sourceName string, reader io.ReadCloser, batchSize int) *Batcher {
out := newBatcher(128)
func OpenReaderToChan(sourceName string, reader io.ReadCloser, batchSize, batchBuffer int) *Batcher {
out := newBatcher(batchBuffer)

go func() {
defer reader.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/extractor/batchers/readerBatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestOpenReaderToChan(t *testing.T) {
r := ioutil.NopCloser(strings.NewReader("Hello\nthere\nbob"))
b := OpenReaderToChan("src", r, 1)
b := OpenReaderToChan("src", r, 1, 1)

b1 := <-b.BatchChan()
assert.Equal(t, "src", b1.Source)
Expand Down
4 changes: 2 additions & 2 deletions pkg/extractor/batchers/tailBatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

// TailFilesToChan tails a set of files to an input batcher that can be consumed by extractor
// unlike a normal file batcher, this will attempt to tail all files at once
func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll, tail bool) *Batcher {
out := newBatcher(128)
func TailFilesToChan(filenames <-chan string, batchSize, batchBuffer int, reopen, poll, tail bool) *Batcher {
out := newBatcher(batchBuffer)

go func() {
var wg sync.WaitGroup
Expand Down
4 changes: 2 additions & 2 deletions pkg/extractor/batchers/tailBatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestBatchFollowFile(t *testing.T) {
filenames := make(chan string, 1)
filenames <- "tailBatcher_test.go" // me

batcher := TailFilesToChan(filenames, 5, false, false, false)
batcher := TailFilesToChan(filenames, 5, 1, false, false, false)

batch := <-batcher.BatchChan()
assert.Equal(t, "tailBatcher_test.go", batch.Source)
Expand All @@ -38,7 +38,7 @@ func TestBatchFollowTailFile(t *testing.T) {
filenames := make(chan string, 1)
filenames <- tmp.Name()

batcher := TailFilesToChan(filenames, 1, false, false, true)
batcher := TailFilesToChan(filenames, 1, 1, false, false, true)

for batcher.ActiveFileCount() == 0 {
time.Sleep(time.Millisecond) // Semi-hack: Wait for the go-routine reader to start and the source to be drained
Expand Down