Skip to content
This repository has been archived by the owner on Dec 4, 2022. It is now read-only.

Commit

Permalink
sometimes less is more
Browse files Browse the repository at this point in the history
turns out search workers really are pure overhead, mainly just increasing CPU usage. even one worker had a negative impact. it helps to understand how pgzip achieves its performance.

same goes for pigz, grep and jq. using parallel actually almost doubles the time it takes to crunch through the cname dataset.
  • Loading branch information
nscuro committed Jul 28, 2020
1 parent 8b34ed6 commit 42fd306
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 225 deletions.
29 changes: 13 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

*Swiftly search [FDNS](ttps://github.com/rapid7/sonar/wiki/Forward-DNS) datasets from Rapid7 Open Data*

**Disclaimer**: You can do most of what *fdnssearch* does with [`bash`, `curl`, `pigz`, `jq` and GNU `parallel`](https://github.com/rapid7/sonar/wiki/Analyzing-Datasets). This is nothing revolutionary. *fdnssearch* simply offers a [nicer UX](#usage), some QoL features and slightly better [performance](#performance).
**Disclaimer**: You can do most of what *fdnssearch* does with [`bash`, `curl`, `pigz` and `jq`](https://github.com/rapid7/sonar/wiki/Analyzing-Datasets). This is nothing revolutionary. *fdnssearch* simply offers a [nicer UX](#usage) and some [QoL features](#interoparability).

## Installation

Expand Down Expand Up @@ -78,33 +78,30 @@ $ fdnssearch -f /path/to/datasets/2020-05-23-1590208726-fdns_a.json.gz -d exampl

### Performance

*fdnssearch* utilizes the `klauspost/pgzip` library for [performant gzip decompression](https://github.com/klauspost/pgzip#decompression-1).
Each decompressed dataset entry immediately spawns a [goroutine](https://golangbot.com/goroutines/) ("*search worker*") that takes care of
filtering and parsing. This means that the faster your source medium (internet connection, HDD or SSD), the more goroutines will run concurrently.
I/O really is the only limiting factor here, no matter where you load your datasets from.

For me, *fdnssearch* is even quite a bit faster than the `pigz`, `parallel` and `jq` approach:
*fdnssearch* uses *klauspost*'s [`pgzip`](https://github.com/klauspost/pgzip) for performant decompression of the datasets.
Thanks to `pgzip`, the performance of *fdnssearch* is pretty much on par with the `pigz`, `grep` and `jq` approach:

```bash
$ time pigz -dc /path/to/datasets/2020-06-28-1593366733-fdns_cname.json.gz \
| parallel --gnu --pipe "grep '\.google\.com'" \
| parallel --gnu --pipe "jq '. | select(.name | endswith(\".google.com\")) | .name'" \
| grep 'google\.com' \
| jq '. | select(.name | endswith(".google.com")) | select(.type == "cname") | .name' \
> /dev/null
pigz -dc /path/to/datasets/2020-06-28-1593366733-fdns_cname.json.gz 62.84s user 41.95s system 113% cpu 1:32.02 total
parallel --gnu --pipe "grep '\.google\.com'" 185.31s user 78.92s system 287% cpu 1:32.02 total
parallel --gnu --pipe > /dev/null 6.12s user 1.08s system 7% cpu 1:32.06 total
pigz -dc /path/to/datasets/2020-06-28-1593366733-fdns_cname.json.gz 57.22s user 41.11s system 212% cpu 46.269 total
grep --color=auto --exclude-dir={.bzr,CVS,.git,.hg,.svn,.idea,.tox} 18.78s user 6.91s system 55% cpu 46.268 total
jq > /dev/null 2.59s user 0.07s system 5% cpu 46.268 total

$ time fdnssearch -d google.com -t cname \
$ time fdnssearch -d google.com -t cname --quiet \
-f /path/to/datasets/2020-06-28-1593366733-fdns_cname.json.gz \
--quiet > /dev/null
fdnssearch -d google.com -t cname -f --quiet > /dev/null 405.62s user 60.74s system 683% cpu 1:08.26 total
> /dev/null
fdnssearch -d google.com -t cname -f --quiet > /dev/null 64.58s user 0.85s system 144% cpu 45.266 total
```

This is with an [Intel i7 8700K](https://ark.intel.com/content/www/us/en/ark/products/126684/intel-core-i7-8700k-processor-12m-cache-up-to-4-70-ghz.html) and a [Samsung 970 EVO NVMe M.2 SSD](https://www.samsung.com/us/computing/memory-storage/solid-state-drives/ssd-970-evo-nvme-m2-500gb-mz-v7e500bw/) on Windows 10 in WSL 2. Your mileage may vary.

### Deduplication

*fdnssearch* will not perform deduplication in order to provide search results as quick and efficient as possible. Use tools like `uniq` or `sort` for this.
*fdnssearch* will not perform deduplication in order to provide search results as quickly and efficiently as possible.
Use tools like `uniq` or `sort` for this.

Given a file `results.txt` which only contains record names, deduplication can be achieved with:

Expand Down
8 changes: 6 additions & 2 deletions cmd/fdnssearch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"net/http"
"os"
"runtime"
"time"

"github.com/klauspost/pgzip"
Expand All @@ -21,6 +22,9 @@ var (
Run: runCmd,
}

pgzipBlocks = runtime.NumCPU()
pgzipBlockSize = 5000000 // 5MB

pDatasetFiles []string
pSearchDomains []string
pExcludedDomains []string
Expand Down Expand Up @@ -91,7 +95,7 @@ func runCmd(_ *cobra.Command, _ []string) {
return
}

gzipReader, err := pgzip.NewReader(file)
gzipReader, err := pgzip.NewReaderN(file, pgzipBlockSize, pgzipBlocks)
if err != nil {
logger.Err(err)
return
Expand Down Expand Up @@ -162,7 +166,7 @@ func runCmd(_ *cobra.Command, _ []string) {
return
}

gzipReader, err := pgzip.NewReader(res.Body)
gzipReader, err := pgzip.NewReaderN(res.Body, pgzipBlockSize, pgzipBlocks)
if err != nil {
logger.Err(err)
return
Expand Down
70 changes: 11 additions & 59 deletions internal/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,12 @@ import (
"fmt"
"io"
"strings"
"sync"

"github.com/nscuro/fdnssearch/internal/dataset"
"github.com/valyala/fastjson"
)

type searchWorkerContext struct {
chunk string
domains *[]string
exclusions *[]string
types *[]string
jsonParserPool *fastjson.ParserPool
resultsChan chan<- dataset.Entry
errorsChan chan<- error
waitGroup *sync.WaitGroup
}

func searchWorker(ctx searchWorkerContext) {
defer ctx.waitGroup.Done()

entry, err := filter(ctx.chunk, ctx.types, ctx.domains, ctx.exclusions, ctx.jsonParserPool)
if err != nil {
ctx.errorsChan <- err
return
} else if entry == nil {
return
}

ctx.resultsChan <- *entry
}

func filter(chunk string, types *[]string, domains *[]string, exclusions *[]string, jsonParserPool *fastjson.ParserPool) (*dataset.Entry, error) {
// prevent the necessity to decode entries that definitely
// do not match the given search criteria. decoding json is
// drastically more computationally expensive than this simple
// loop.
func filter(chunk string, types *[]string, domains *[]string, exclusions *[]string, jsonParser *fastjson.Parser) (*dataset.Entry, error) {
possibleMatch := false
for _, domain := range *domains {
if strings.Contains(chunk, domain) {
Expand All @@ -53,19 +23,13 @@ func filter(chunk string, types *[]string, domains *[]string, exclusions *[]stri
return nil, nil
}

jsonParser := jsonParserPool.Get()
parsedEntry, err := jsonParser.Parse(chunk)
if err != nil {
jsonParserPool.Put(jsonParser)
return nil, fmt.Errorf("failed to parse entry: %w", err)
}

// parse everything we need in advance so jsonParser can
// be put back into the pool as fast as possible
entryName := string(parsedEntry.GetStringBytes("name"))
entryValue := string(parsedEntry.GetStringBytes("value"))
entryType := string(parsedEntry.GetStringBytes("type"))
jsonParserPool.Put(jsonParser)

// filter by type
if len(*types) > 0 {
Expand Down Expand Up @@ -124,7 +88,7 @@ type Options struct {
}

type Searcher struct {
jsonParserPool fastjson.ParserPool
jsonParser fastjson.Parser
}

func NewSearcher() *Searcher {
Expand All @@ -136,20 +100,13 @@ func (s Searcher) Search(ctx context.Context, options Options) (<-chan dataset.E
return nil, nil, fmt.Errorf("invalid options: %w", err)
}

resultsChan := make(chan dataset.Entry, 10)
resultsChan := make(chan dataset.Entry, 100)
errorsChan := make(chan error)

go func() {
defer close(resultsChan)
defer close(errorsChan)

// wait group for search workers
waitGroup := sync.WaitGroup{}

// pool for fastjson.Parser to encourage reusing
// of instances without causing race conditions
jsonParserPool := fastjson.ParserPool{}

scanner := bufio.NewScanner(options.DatasetReader)
scanLoop:
for scanner.Scan() {
Expand All @@ -160,20 +117,15 @@ func (s Searcher) Search(ctx context.Context, options Options) (<-chan dataset.E
default:
}

waitGroup.Add(1)
go searchWorker(searchWorkerContext{
chunk: scanner.Text(),
domains: &options.Domains,
exclusions: &options.Exclusions,
types: &options.Types,
jsonParserPool: &jsonParserPool,
resultsChan: resultsChan,
errorsChan: errorsChan,
waitGroup: &waitGroup,
})
entry, err := filter(scanner.Text(), &options.Types, &options.Domains, &options.Exclusions, &s.jsonParser)
if err != nil {
errorsChan <- err
continue
} else if entry == nil {
continue
}
resultsChan <- *entry
}

waitGroup.Wait()
}()

return resultsChan, errorsChan, nil
Expand Down
Loading

0 comments on commit 42fd306

Please sign in to comment.