Skip to content

Commit

Permalink
Optimize memory usage by reducing buffered batches by default (#82)
Browse files Browse the repository at this point in the history
With default settings, rare used ~50MB consistently. These tweaks and settings lower it to ~10MB while maintaining performance. For io-burst systems, you can tweak up the buffered batches via CLI.
  • Loading branch information
zix99 committed Sep 24, 2022
1 parent 1e7191b commit 5edf15b
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 19 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/main.yml
Expand Up @@ -28,10 +28,9 @@ jobs:
run: |
go run honnef.co/go/tools/cmd/staticcheck ./...
- name: Codecov
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
run: |
bash <(curl -s https://codecov.io/bash)
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v1
with:
Expand Down
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -87,6 +87,8 @@ Available tags:

* `experimental` Enable experimental features (eg. fuzzy search)
* `pcre2` Enables PCRE 2 (v10) where able. Currently linux only
* `rare_no_pprof` Disables profiling capabilities, which reduces binary size
* `urfave_cli_no_docs` Disables man and markdown documentation generation, which reduces binary size

**A Note on PCRE (Perl Compatible Regex Library)**

Expand Down
17 changes: 13 additions & 4 deletions cmd/helpers/extractorBuilder.go
Expand Up @@ -30,6 +30,7 @@ func BuildBatcherFromArguments(c *cli.Context) *batchers.Batcher {
concurrentReaders = c.Int("readers")
gunzip = c.Bool("gunzip")
batchSize = c.Int("batch")
batchBuffer = c.Int("batch-buffer")
recursive = c.Bool("recursive")
)

Expand All @@ -55,14 +56,14 @@ func BuildBatcherFromArguments(c *cli.Context) *batchers.Batcher {
if follow {
logger.Println("Cannot follow a stdin stream, not a file")
}
return batchers.OpenReaderToChan("<stdin>", os.Stdin, batchSize)
return batchers.OpenReaderToChan("<stdin>", os.Stdin, batchSize, batchBuffer)
} else if follow { // Read from source file
if gunzip {
logger.Println("Cannot combine -f and -z")
}
return batchers.TailFilesToChan(dirwalk.GlobExpand(fileglobs, recursive), batchSize, followReopen, followPoll, followTail)
return batchers.TailFilesToChan(dirwalk.GlobExpand(fileglobs, recursive), batchSize, batchBuffer, followReopen, followPoll, followTail)
} else { // Read (no-follow) source file(s)
return batchers.OpenFilesToChan(dirwalk.GlobExpand(fileglobs, recursive), gunzip, concurrentReaders, batchSize)
return batchers.OpenFilesToChan(dirwalk.GlobExpand(fileglobs, recursive), gunzip, concurrentReaders, batchSize, batchBuffer)
}
}

Expand Down Expand Up @@ -99,6 +100,8 @@ func BuildExtractorFromArgumentsEx(c *cli.Context, batcher *batchers.Batcher, se
}

func getExtractorFlags() []cli.Flag {
workerCount := runtime.NumCPU()/2 + 1

return []cli.Flag{
&cli.BoolFlag{
Name: "follow",
Expand Down Expand Up @@ -173,12 +176,18 @@ func getExtractorFlags() []cli.Flag {
Usage: "Specifies io batching size. Set to 1 for immediate input",
Value: 1000,
},
&cli.IntFlag{
Name: "batch-buffer",
Category: cliCategoryTweaking,
Usage: "Specifies how many batches to read-ahead. Impacts memory usage, can improve performance",
Value: workerCount * 2, // Keep 2 batches ready for each worker
},
&cli.IntFlag{
Name: "workers",
Aliases: []string{"w"},
Category: cliCategoryTweaking,
Usage: "Set number of data processors",
Value: runtime.NumCPU()/2 + 1,
Value: workerCount,
},
&cli.IntFlag{
Name: "readers",
Expand Down
2 changes: 1 addition & 1 deletion cmd/helpers/updatingAggregator_test.go
Expand Up @@ -29,7 +29,7 @@ func (s *VirtualAggregator) ParseErrors() uint64 {

func TestAggregationLoop(t *testing.T) {
// Build a real extractor
batcher := batchers.OpenReaderToChan("test", ioutil.NopCloser(strings.NewReader(testData)), 1)
batcher := batchers.OpenReaderToChan("test", ioutil.NopCloser(strings.NewReader(testData)), 1, 1)
ex, err := extractor.New(batcher.BatchChan(), &extractor.Config{
Regex: `(\d+)`,
Extract: "val:{1}",
Expand Down
2 changes: 2 additions & 0 deletions docs/index.md
Expand Up @@ -83,6 +83,8 @@ Available tags:

* `experimental` Enable experimental features (eg. fuzzy search)
* `pcre2` Enables PCRE 2 (v10) where able. Currently linux only
* `rare_no_pprof` Disables profiling capabilities, which reduces binary size
* `urfave_cli_no_docs` Disables man and markdown documentation generation, which reduces binary size


## Quickstart
Expand Down
4 changes: 4 additions & 0 deletions docs/usage/input.md
Expand Up @@ -98,3 +98,7 @@ that the batch will be processed irregardless of its current size.
You can tweak this value with `--batch`

`rare <aggreagator> --batch=10 ...`

In addition, you can tweak how many batches are buffered for the extractor with `--batch-buffer`.
By default, it will buffer 2 batches for every worker. More buffers take more memory, and
may slightly improve performance or handle intermittent IO better.
4 changes: 2 additions & 2 deletions pkg/extractor/batchers/fileBatcher.go
Expand Up @@ -10,8 +10,8 @@ import (

// openFilesToChan takes an iterated channel of filenames, options, and loads them all with
// a max concurrency. Returns a channel that will populate with input batches
func OpenFilesToChan(filenames <-chan string, gunzip bool, concurrency int, batchSize int) *Batcher {
out := newBatcher(128)
func OpenFilesToChan(filenames <-chan string, gunzip bool, concurrency int, batchSize, batchBuffer int) *Batcher {
out := newBatcher(batchBuffer)
sema := make(chan struct{}, concurrency)

// Load as many files as the sema allows
Expand Down
2 changes: 1 addition & 1 deletion pkg/extractor/batchers/fileBatcher_test.go
Expand Up @@ -13,7 +13,7 @@ func TestOpenFilesToChan(t *testing.T) {
filenames <- "fileBatcher_test.go" // me!
close(filenames)

batches := OpenFilesToChan(filenames, false, 1, 1)
batches := OpenFilesToChan(filenames, false, 1, 1, 1)

total := 0
var lastStart uint64 = 0
Expand Down
4 changes: 2 additions & 2 deletions pkg/extractor/batchers/readerBatcher.go
Expand Up @@ -4,8 +4,8 @@ import (
"io"
)

func OpenReaderToChan(sourceName string, reader io.ReadCloser, batchSize int) *Batcher {
out := newBatcher(128)
func OpenReaderToChan(sourceName string, reader io.ReadCloser, batchSize, batchBuffer int) *Batcher {
out := newBatcher(batchBuffer)

go func() {
defer reader.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/extractor/batchers/readerBatcher_test.go
Expand Up @@ -10,7 +10,7 @@ import (

func TestOpenReaderToChan(t *testing.T) {
r := ioutil.NopCloser(strings.NewReader("Hello\nthere\nbob"))
b := OpenReaderToChan("src", r, 1)
b := OpenReaderToChan("src", r, 1, 1)

b1 := <-b.BatchChan()
assert.Equal(t, "src", b1.Source)
Expand Down
4 changes: 2 additions & 2 deletions pkg/extractor/batchers/tailBatcher.go
Expand Up @@ -8,8 +8,8 @@ import (

// TailFilesToChan tails a set of files to an input batcher that can be consumed by extractor
// unlike a normal file batcher, this will attempt to tail all files at once
func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll, tail bool) *Batcher {
out := newBatcher(128)
func TailFilesToChan(filenames <-chan string, batchSize, batchBuffer int, reopen, poll, tail bool) *Batcher {
out := newBatcher(batchBuffer)

go func() {
var wg sync.WaitGroup
Expand Down
4 changes: 2 additions & 2 deletions pkg/extractor/batchers/tailBatcher_test.go
Expand Up @@ -13,7 +13,7 @@ func TestBatchFollowFile(t *testing.T) {
filenames := make(chan string, 1)
filenames <- "tailBatcher_test.go" // me

batcher := TailFilesToChan(filenames, 5, false, false, false)
batcher := TailFilesToChan(filenames, 5, 1, false, false, false)

batch := <-batcher.BatchChan()
assert.Equal(t, "tailBatcher_test.go", batch.Source)
Expand All @@ -38,7 +38,7 @@ func TestBatchFollowTailFile(t *testing.T) {
filenames := make(chan string, 1)
filenames <- tmp.Name()

batcher := TailFilesToChan(filenames, 1, false, false, true)
batcher := TailFilesToChan(filenames, 1, 1, false, false, true)

for batcher.ActiveFileCount() == 0 {
time.Sleep(time.Millisecond) // Semi-hack: Wait for the go-routine reader to start and the source to be drained
Expand Down

0 comments on commit 5edf15b

Please sign in to comment.