Skip to content

Commit

Permalink
Enable follow tailing (#70)
Browse files Browse the repository at this point in the history
* Enable follow tailing
  • Loading branch information
zix99 committed Jun 8, 2022
1 parent 5360801 commit 0b91037
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 12 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -21,7 +21,7 @@ See [rare.zdyn.net](https://rare.zdyn.net) or the [docs/ folder](docs/) for the
* Multiple summary formats including: filter (like grep), histogram, bar graphs, and numerical analysis
* File glob expansions (eg `/var/log/*` or `/var/log/*/*.log`) and `-R`
* Optional gzip decompression (with `-z`)
* Following `-f` or re-open following `-F` (use `--poll` to poll)
* Following `-f` or re-open following `-F` (use `--poll` to poll, and `--tail` to tail)
* Ignoring lines that match an expression (with `-i`)
* Aggregating and realtime summary (Don't have to wait for all data to be scanned)
* Multi-threaded reading, parsing, and aggregation (It's fast)
Expand Down
10 changes: 9 additions & 1 deletion cmd/helpers/extractorBuilder.go
Expand Up @@ -18,6 +18,7 @@ const DefaultArgumentDescriptor = "<-|filename|glob...>"
func BuildBatcherFromArguments(c *cli.Context) *batchers.Batcher {
var (
follow = c.Bool("follow") || c.Bool("reopen")
followTail = c.Bool("tail")
followReopen = c.Bool("reopen")
followPoll = c.Bool("poll")
concurrentReaders = c.Int("readers")
Expand All @@ -35,6 +36,9 @@ func BuildBatcherFromArguments(c *cli.Context) *batchers.Batcher {
if followPoll && !follow {
logger.Fatalf("Follow (-f) must be enabled for --poll")
}
if followTail && !follow {
logger.Fatalf("Follow (-f) must be enabled for --tail")
}

fileglobs := c.Args()

Expand All @@ -50,7 +54,7 @@ func BuildBatcherFromArguments(c *cli.Context) *batchers.Batcher {
if gunzip {
logger.Println("Cannot combine -f and -z")
}
return batchers.TailFilesToChan(dirwalk.GlobExpand(fileglobs, recursive), batchSize, followReopen, followPoll)
return batchers.TailFilesToChan(dirwalk.GlobExpand(fileglobs, recursive), batchSize, followReopen, followPoll, followTail)
} else { // Read (no-follow) source file(s)
return batchers.OpenFilesToChan(dirwalk.GlobExpand(fileglobs, recursive), gunzip, concurrentReaders, batchSize)
}
Expand Down Expand Up @@ -102,6 +106,10 @@ func getExtractorFlags() []cli.Flag {
Name: "poll",
Usage: "When following a file, poll for changes rather than using inotify",
},
cli.BoolFlag{
Name: "tail,t",
Usage: "When following a file, navigate to the end of the file to skip existing content",
},
cli.BoolFlag{
Name: "posix,p",
Usage: "Compile regex as against posix standard",
Expand Down
Binary file modified docs/cli-help.md
Binary file not shown.
2 changes: 1 addition & 1 deletion docs/index.md
Expand Up @@ -19,7 +19,7 @@ Supports various CLI-based graphing and metric formats (filter (grep-like), hist
* Multiple summary formats including: filter (like grep), histogram, bar graphs, and numerical analysis
* File glob expansions (eg `/var/log/*` or `/var/log/*/*.log`) and `-R`
* Optional gzip decompression (with `-z`)
* Following `-f` or re-open following `-F` (use `--poll` to poll)
* Following `-f` or re-open following `-F` (use `--poll` to poll, and `--tail` to tail)
* Ignoring lines that match an expression (with `-i`)
* Aggregating and realtime summary (Don't have to wait for all data to be scanned)
* Multi-threaded reading, parsing, and aggregation (It's fast)
Expand Down
6 changes: 5 additions & 1 deletion docs/usage/extractor.md
Expand Up @@ -44,4 +44,8 @@ get something that will count based on keys.

```bash
rare histogram -m '"(\w{3,4}) ([A-Za-z0-9/.@_-]+)' -e '{1} {2}' -b access.log
```
```

## See Also

* [Regular Expressions](regexp.md)
100 changes: 100 additions & 0 deletions docs/usage/input.md
@@ -0,0 +1,100 @@
# Input

*rare* reads the supplied inputs in massive parallelization, rather
than in-order reads. In most cases, you won't need to do anything
other than specifying what to read. In some cases, you may want to
tweak some parameters.

## Input Methods

### Read File(s)

The simplest version of reading files is by specifying one or more filename:

`rare <aggregator> file1 file2 file3...`

You can also use simple expansions, such as:

`rare <aggregator> path/**/*.log`

In this case, all `*.log` files in any nested directory under `path/` will be read.

or you can use recursion, which will read all plain files in the path

`rare <aggregator> -R path/`

#### gzip

If the files *may* be gzip'd you can specify `-z`, and will be gunzip'd if able. If a
file can't be opened as a gzip file, a warning will be logged, and it will be interpreted
as a raw file.

`rare <aggregator> -z *.log.gz`

### Following File(s)

Like `tail -f`, following files allows you to watch files actively being written to. This is
useful, for example, to read a log of an actively running application.

**Note:** When following files, all files are open at once, and max readers are ignored.

`rare <aggregator> -f app.log`

If the file may be deleted and recreated, such as in a log-rotation, you can follow with re-open

`rare <aggregator> -F app.log`

#### Polling (Instead of blocking)

By default, following a file uses `fsnotify` which monitors files for changes. This should
work fine for most major operating systems. If not, you can enable polling to watch for changes
instead with `--poll`

#### Tailing

If you wish to only start reading at the end of the file (eg. only looking at newer entries),
you can specify `-t` or `--tail` to start following at the end.

### Stdin/Pipe

There are two ways to read from a pipe: implicit and explicit.

Implicitely, if *rare* detects its stdin is a pipe, it will read it simply by not providing any arguments

`cat file.log | rare <aggregator>` or `rare <aggregator> < file.log`

Explicitely, you can pass a single read argument of `-` (dash) to mandate reading from stdin

`cat file.log | rare <aggregator> -`

## Tweaking the Batcher

There are already some heuristics that optimize how files are read which
should work for most cases. If you do find you need to modify how *rare*
is reading, you can tweak two things:

* concurrency -- How many files are read at once
* batch size -- How many lines read from a given file are "batched" to send to the expression stage

### Concurrency

Concurrency specifies how many files are opened at once (in a normal case). It
defaults to `3`, but is ignored if following files.

Specify with:

`rare <aggregator> --readers=1 file1 file2 file3...`

### Batch Sizes

Rare reads (by default) 1000 lines in a file, for a batch, before providing it
to the extractor stage. This significantly speeds up processing, but comes
at the cost of being less real-time if input generation is slow.

To counteract this, in the *follow* or *stdin* cases, there's also a flush timeout of
250ms. This means if a new line has been received, and the duration has passed,
that the batch will be processed irregardless of its current size.

You can tweak this value with `--batch`

`rare <aggreagator> --batch=10 ...`
20 changes: 16 additions & 4 deletions docs/usage/overview.md
Expand Up @@ -3,11 +3,23 @@
Rare is a fast, realtime regex-extraction, and aggregation into common formats
such as histograms, numerical summaries, tables, and more!

Rare is composed of three parts in the pipeline:
Rare is composed of four parts in the pipeline:

1. Extraction (Matching)
2. Expression Building
3. Aggregation
1. Batching (Loading)
2. Extraction (Matching)
3. Expression Building
4. Aggregation

## Input (Batching/Loading)

Input (or batching) is the act of feeding contents read from a file (or stdin/pipe) into
the next stages. Many times this is invisible, and is simply the pipe or specified filename.

It is possible to tune the batcher to follow the file or batch in different ways.

Read more at:

* [input](input.md)

## Extraction (Matching)

Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Expand Up @@ -10,6 +10,7 @@ nav:
- Home: index.md
- Usage:
- Overview: usage/overview.md
- Input: usage/input.md
- Extractor: usage/extractor.md
- Expressions: usage/expressions.md
- Aggregators: usage/aggregators.md
Expand Down
8 changes: 7 additions & 1 deletion pkg/extractor/batchers/tailBatcher.go
Expand Up @@ -8,7 +8,7 @@ import (

// TailFilesToChan tails a set of files to an input batcher that can be consumed by extractor
// unlike a normal file batcher, this will attempt to tail all files at once
func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll bool) *Batcher {
func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll, tail bool) *Batcher {
out := newBatcher(128)

go func() {
Expand All @@ -28,6 +28,12 @@ func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll bool)
out.incErrors()
return
}
if tail {
if err := r.Drain(); err != nil {
logger.Print("Unable to tail file source: ", err)
out.incErrors()
}
}

out.syncReaderToBatcherWithTimeFlush(filename, r, batchSize, AutoFlushTimeout)
}(filename)
Expand Down
48 changes: 45 additions & 3 deletions pkg/extractor/batchers/tailBatcher_test.go
@@ -1,19 +1,61 @@
package batchers

import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestBatchTailFile(t *testing.T) {
func TestBatchFollowFile(t *testing.T) {
filenames := make(chan string, 1)
filenames <- "tailBatcher_test.go" // me

batcher := TailFilesToChan(filenames, 5, false, false)
batcher := TailFilesToChan(filenames, 5, false, false, false)

batch := <-batcher.c
batch := <-batcher.BatchChan()
assert.Equal(t, "tailBatcher_test.go", batch.Source)
assert.Len(t, batch.Batch, 5)
assert.NotZero(t, batcher.ReadBytes())
}

func TestBatchFollowTailFile(t *testing.T) {
tmp, err := ioutil.TempFile("", "followtest-")
if err != nil {
panic(err)
}
defer tmp.Close()
defer os.Remove(tmp.Name())

// Add test data
for i := 0; i < 10; i++ {
tmp.WriteString("abc\n")
}

// Now tail the file
filenames := make(chan string, 1)
filenames <- tmp.Name()

batcher := TailFilesToChan(filenames, 1, false, false, true)

time.Sleep(300 * time.Millisecond) // Uhg hack cause auto-flushing

// And write some more data
const testLines = 5
for i := 0; i < testLines; i++ {
tmp.WriteString("abc\n")
}

// And finally assert we got what we wanted
for i := 0; i < testLines; i++ {
batch, ok := <-batcher.BatchChan()
assert.True(t, ok)
if ok {
assert.Equal(t, tmp.Name(), batch.Source)
assert.Equal(t, uint64(i+1), batch.BatchStart)
assert.Len(t, batch.Batch, 1)
}
}
}

0 comments on commit 0b91037

Please sign in to comment.