Skip to content

Commit

Permalink
Merge branch 'public-api-refactoring'
Browse files Browse the repository at this point in the history
* public-api-refactoring:
  v0.3.0
  update README
  display some stats in -verbose mode
  breaking change: make quiet the default
  add support for gzipped input files
  update inline doc
  long tokens work better with reader
  fix Options package name
  starting public API refactorings
  • Loading branch information
miku committed Nov 30, 2014
2 parents fc2482d + 58d3d9d commit f6d4363
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 76 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ Usage
-----

$ esbulk -h
Usage: esbulk [OPTIONS] JSON
-cpuprofile="": write cpu profile to file
-host="localhost": elasticsearch host
-index="": index name
-memprofile="": write heap profile to file
-port=9200: elasticsearch port
-q=false: do not produce any output
-size=1000: bulk batch size
-type="default": elasticsearch doc type
-v=false: prints current program version
-verbose=false: output basic progress
-w=4: number of workers to use
-z=false: unzip gz'd file on the fly

To index a JSON file, that contains one document per line, just run:

Expand All @@ -36,4 +38,8 @@ Where `file.ldj` is line delimited JSON, like:
...

By default `esbulk` will use as many parallel workers, as there are cores.
To tweak the indexing process, adjust the `size` and `w` parameters.
To tweak the indexing process, adjust the `-size` and `-w` parameters.

You can index from gzipped files as well, using the `-z` flag:

$ esbulk -z -index example file.ldj.gz
106 changes: 35 additions & 71 deletions cmd/esbulk/esbulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,22 @@ package main

import (
"bufio"
"compress/gzip"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"runtime"
"runtime/pprof"
"strings"
"sync"
"time"

"github.com/miku/esbulk"
)

// Options represents bulk indexing options
type Options struct {
Host string
Port int
Index string
DocType string
BatchSize int
Quiet bool
}

// BulkIndex takes a list of documents as strings and indexes them into elasticsearch
func BulkIndex(docs []string, options Options) error {
url := fmt.Sprintf("http://%s:%d/%s/%s/_bulk", options.Host, options.Port, options.Index, options.DocType)
header := fmt.Sprintf(`{"index": {"_index": "%s", "_type": "%s"}}`, options.Index, options.DocType)
var lines []string
for _, doc := range docs {
if len(strings.TrimSpace(doc)) == 0 {
continue
}
lines = append(lines, header)
lines = append(lines, doc)
}
body := fmt.Sprintf("%s\n", strings.Join(lines, "\n"))
response, err := http.Post(url, "application/json", strings.NewReader(body))
if err != nil {
return err
}
// > Caller should close resp.Body when done reading from it.
// Results in a resource leak otherwise.
response.Body.Close()
return nil
}

// Worker will batch index documents that come in on the lines channel
func Worker(id string, options Options, lines chan string, wg *sync.WaitGroup) {
defer wg.Done()
var docs []string
counter := 0
for s := range lines {
docs = append(docs, s)
counter++
if counter%options.BatchSize == 0 {
err := BulkIndex(docs, options)
if err != nil {
log.Fatal(err)
}
if !options.Quiet {
fmt.Fprintf(os.Stderr, "[%s] @%d\n", id, counter)
}
docs = docs[:0]
}
}
err := BulkIndex(docs, options)
if err != nil {
log.Fatal(err)
}
if !options.Quiet {
fmt.Fprintf(os.Stderr, "[%s] @%d\n", id, counter)
}
}

func main() {

version := flag.Bool("v", false, "prints current program version")
Expand All @@ -87,7 +29,8 @@ func main() {
port := flag.Int("port", 9200, "elasticsearch port")
batchSize := flag.Int("size", 1000, "bulk batch size")
numWorkers := flag.Int("w", runtime.NumCPU(), "number of workers to use")
quiet := flag.Bool("q", false, "do not produce any output")
verbose := flag.Bool("verbose", false, "output basic progress")
gzipped := flag.Bool("z", false, "unzip gz'd file on the fly")

var PrintUsage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS] JSON\n", os.Args[0])
Expand Down Expand Up @@ -127,21 +70,21 @@ func main() {

runtime.GOMAXPROCS(*numWorkers)

options := Options{
options := esbulk.Options{
Host: *host,
Port: *port,
Index: *indexName,
DocType: *docType,
BatchSize: *batchSize,
Quiet: *quiet,
Verbose: *verbose,
}

queue := make(chan string)
var wg sync.WaitGroup

for i := 0; i < *numWorkers; i++ {
wg.Add(1)
go Worker(fmt.Sprintf("worker-%d", i), options, queue, &wg)
go esbulk.Worker(fmt.Sprintf("worker-%d", i), options, queue, &wg)
}

// set refresh inteval to -1
Expand All @@ -150,18 +93,34 @@ func main() {
log.Fatal(err)
}

scanner := bufio.NewScanner(file)
for scanner.Scan() {
s := scanner.Text()
queue <- s
reader := bufio.NewReader(file)
if *gzipped {
zreader, err := gzip.NewReader(file)
if err != nil {
log.Fatal(err)
}
reader = bufio.NewReader(zreader)
}

if err := scanner.Err(); err != nil {
log.Fatal(err)
counter := 0
start := time.Now()

for {
line, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
line = strings.TrimSpace(line)
queue <- line
counter += 1
}

close(queue)
wg.Wait()
elapsed := time.Since(start)

defer func() {
_, err = http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/%s/_settings", *host, *port, *indexName), strings.NewReader(`{"index": {"refresh_interval": "1s"}}`))
Expand All @@ -182,4 +141,9 @@ func main() {
pprof.WriteHeapProfile(f)
f.Close()
}

if *verbose {
rate := float64(counter) / elapsed.Seconds()
log.Printf("%d docs in %s at %0.3f docs/s with %d workers\n", counter, elapsed, rate, *numWorkers)
}
}
71 changes: 70 additions & 1 deletion common.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,73 @@
package esbulk

import (
"fmt"
"log"
"net/http"
"strings"
"sync"
)

// Application Version
const Version = "0.2.4"
const Version = "0.3.0"

// Options represents bulk indexing options
type Options struct {
Host string
Port int
Index string
DocType string
BatchSize int
Verbose bool
}

// BulkIndex takes a set of documents as strings and indexes them into elasticsearch
func BulkIndex(docs []string, options Options) error {
url := fmt.Sprintf("http://%s:%d/%s/%s/_bulk", options.Host, options.Port, options.Index, options.DocType)
header := fmt.Sprintf(`{"index": {"_index": "%s", "_type": "%s"}}`, options.Index, options.DocType)
var lines []string
for _, doc := range docs {
if len(strings.TrimSpace(doc)) == 0 {
continue
}
lines = append(lines, header)
lines = append(lines, doc)
}
body := fmt.Sprintf("%s\n", strings.Join(lines, "\n"))
response, err := http.Post(url, "application/json", strings.NewReader(body))
if err != nil {
return err
}
// > Caller should close resp.Body when done reading from it.
// Results in a resource leak otherwise.
response.Body.Close()
return nil
}

// Worker will batch index documents that come in on the lines channel
func Worker(id string, options Options, lines chan string, wg *sync.WaitGroup) {
defer wg.Done()
var docs []string
counter := 0
for s := range lines {
docs = append(docs, s)
counter++
if counter%options.BatchSize == 0 {
err := BulkIndex(docs, options)
if err != nil {
log.Fatal(err)
}
if options.Verbose {
log.Printf("[%s] @%d\n", id, counter)
}
docs = docs[:0]
}
}
err := BulkIndex(docs, options)
if err != nil {
log.Fatal(err)
}
if options.Verbose {
log.Printf("[%s] @%d\n", id, counter)
}
}
2 changes: 1 addition & 1 deletion debian/esbulk/DEBIAN/control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: esbulk
Version: 0.2.4
Version: 0.3.0
Section: utils
Priority: optional
Architecture: amd64
Expand Down
7 changes: 6 additions & 1 deletion packaging/esbulk.spec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Summary: Fast parallel bulk loading utility for elasticsearch.
Name: esbulk
Version: 0.2.4
Version: 0.3.0
Release: 0
License: MIT
BuildArch: x86_64
Expand Down Expand Up @@ -46,6 +46,11 @@ rm -rf %{_topdir}/BUILD/%{name}


%changelog
* Sun Nov 30 2014 Martin Czygan
- 0.3 release
- backwards-incompatible changes: removed -q, added -verbose
- added support for gzipped input files

* Mon Sep 29 2014 Martin Czygan
- 0.2 release, fixed memory leak by closing `response.Body`

Expand Down

0 comments on commit f6d4363

Please sign in to comment.