Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for bulk indexing error (#20) plus TTL addition #21

Merged
merged 8 commits into from
Jul 12, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
elastigo
========

Golang based Elasticsearch client, implements core api for Indexing and searching.
Golang based Elasticsearch client, implements core api for Indexing and searching. GoDoc http://godoc.org/github.com/mattbaird/elastigo

status updates
========================

* 2013-1-26, expansion of search dsl for greater coverage
* 2012-12-30, new bulk indexing and search dsl
* 2012-10-12, early in development, not ready for production yet.
* *2013-7-10* Improvments/changes to bulk indexor (includes breaking changes to support TTL),
Search dsl supports And/Or/Not
* *SearchDsl* should still be considered beta at this
point, there will be minor breaking changes as more of the
elasticsearch feature set is implemented.
* *2013-1-26* expansion of search dsl for greater coverage
* *2012-12-30* new bulk indexing and search dsl
* *2012-10-12* early in development, not ready for production yet.



Search Examples
-------------------------
Expand Down
86 changes: 65 additions & 21 deletions core/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
BulkErrorCt uint64

// There is one Global Bulk Indexor for convenience
bulkIndexor *BulkIndexor
GlobalBulkIndexor *BulkIndexor
)

type ErrorBuffer struct {
Expand All @@ -40,9 +40,9 @@ type ErrorBuffer struct {
// done := make(chan bool)
// BulkIndexorGlobalRun(100, done)
func BulkIndexorGlobalRun(maxConns int, done chan bool) {
if bulkIndexor == nil {
bulkIndexor = NewBulkIndexor(maxConns)
bulkIndexor.Run(done)
if GlobalBulkIndexor == nil {
GlobalBulkIndexor = NewBulkIndexor(maxConns)
GlobalBulkIndexor.Run(done)
}
}

Expand Down Expand Up @@ -71,18 +71,29 @@ type BulkIndexor struct {
// buffers
sendBuf chan *bytes.Buffer
buf *bytes.Buffer
// Buffer for Max number of time before forcing flush
BufferDelayMax time.Duration
// Max buffer size in bytes before flushing to elasticsearch
BulkMaxBuffer int // 1048576
// Max number of Docs to hold in buffer before forcing flush
BulkMaxDocs int // 100

// Number of documents we have send through so far on this session
docCt int
// Max number of http connections in flight at one time
maxConns int
// Was the last send induced by time? or if not, by max docs/size?
lastSendorByTime bool
mu sync.Mutex
// If we are indexing enough docs per bufferdelaymax, we won't need to do time
// based eviction, else we do.
needsTimeBasedFlush bool
mu sync.Mutex
}

func NewBulkIndexor(maxConns int) *BulkIndexor {
b := BulkIndexor{sendBuf: make(chan *bytes.Buffer, maxConns)}
b.lastSendorByTime = true
b.needsTimeBasedFlush = true
b.BulkMaxBuffer = BulkMaxBuffer
b.BulkMaxDocs = BulkMaxDocs
b.BufferDelayMax = time.Duration(BulkDelaySeconds) * time.Second
b.buf = new(bytes.Buffer)
b.maxConns = maxConns
b.bulkChannel = make(chan []byte, 100)
Expand All @@ -97,9 +108,12 @@ func NewBulkIndexor(maxConns int) *BulkIndexor {
// BulkIndexorGlobalRun(100, done)
func NewBulkIndexorErrors(maxConns, retrySeconds int) *BulkIndexor {
b := BulkIndexor{sendBuf: make(chan *bytes.Buffer, maxConns)}
b.lastSendorByTime = true
b.needsTimeBasedFlush = true
b.buf = new(bytes.Buffer)
b.maxConns = maxConns
b.BulkMaxBuffer = BulkMaxBuffer
b.BulkMaxDocs = BulkMaxDocs
b.BufferDelayMax = time.Duration(BulkDelaySeconds) * time.Second
b.RetryForSeconds = retrySeconds
b.bulkChannel = make(chan []byte, 100)
b.ErrorChannel = make(chan *ErrorBuffer, 20)
Expand Down Expand Up @@ -168,17 +182,19 @@ func (b *BulkIndexor) startHttpSendor() {
// start a timer for checking back and forcing flush ever BulkDelaySeconds seconds
// even if we haven't hit max messages/size
func (b *BulkIndexor) startTimer() {
u.Debug("Starting Bulk timer with delay = ", BulkDelaySeconds)
ticker := time.NewTicker(time.Second * time.Duration(BulkDelaySeconds))
ticker := time.NewTicker(b.BufferDelayMax)
log.Println("Starting timer with delay = ", b.BufferDelayMax)
go func() {
for _ = range ticker.C {
b.mu.Lock()
// don't send unless last sendor was the time,
// otherwise an indication of other thresholds being hit
// where time isn't needed
if b.buf.Len() > 0 && b.lastSendorByTime {
b.lastSendorByTime = true
if b.buf.Len() > 0 && b.needsTimeBasedFlush {
b.needsTimeBasedFlush = true
b.send(b.buf)
} else if b.buf.Len() > 0 {
b.needsTimeBasedFlush = true
}
b.mu.Unlock()

Expand All @@ -194,8 +210,8 @@ func (b *BulkIndexor) startDocChannel() {
b.mu.Lock()
b.docCt += 1
b.buf.Write(docBytes)
if b.buf.Len() >= BulkMaxBuffer || b.docCt >= BulkMaxDocs {
b.lastSendorByTime = false
if b.buf.Len() >= b.BulkMaxBuffer || b.docCt >= b.BulkMaxDocs {
b.needsTimeBasedFlush = false
//log.Printf("Send due to size: docs=%d bufsize=%d", b.docCt, b.buf.Len())
b.send(b.buf)
}
Expand All @@ -214,9 +230,9 @@ func (b *BulkIndexor) send(buf *bytes.Buffer) {
// The index bulk API adds or updates a typed JSON document to a specific index, making it searchable.
// it operates by buffering requests, and ocassionally flushing to elasticsearch
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func (b *BulkIndexor) Index(index string, _type string, id string, date *time.Time, data interface{}) error {
func (b *BulkIndexor) Index(index string, _type string, id, ttl string, date *time.Time, data interface{}) error {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
by, err := IndexBulkBytes(index, _type, id, date, data)
by, err := IndexBulkBytes(index, _type, id, ttl, date, data)
if err != nil {
u.Error(err)
return err
Expand All @@ -239,7 +255,7 @@ func BulkSend(buf *bytes.Buffer) error {

// Given a set of arguments for index, type, id, data create a set of bytes that is formatted for bulkd index
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func IndexBulkBytes(index string, _type string, id string, date *time.Time, data interface{}) ([]byte, error) {
func IndexBulkBytes(index string, _type string, id, ttl string, date *time.Time, data interface{}) ([]byte, error) {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
buf := bytes.Buffer{}
buf.WriteString(`{"index":{"_index":"`)
Expand All @@ -248,6 +264,10 @@ func IndexBulkBytes(index string, _type string, id string, date *time.Time, data
buf.WriteString(_type)
buf.WriteString(`","_id":"`)
buf.WriteString(id)
if len(ttl) > 0 {
buf.WriteString(`","ttl":"`)
buf.WriteString(ttl)
}
if date != nil {
buf.WriteString(`","_timestamp":"`)
buf.WriteString(strconv.FormatInt(date.UnixNano()/1e6, 10))
Expand Down Expand Up @@ -275,16 +295,40 @@ func IndexBulkBytes(index string, _type string, id string, date *time.Time, data

// The index bulk API adds or updates a typed JSON document to a specific index, making it searchable.
// it operates by buffering requests, and ocassionally flushing to elasticsearch
//
// This uses the one Global Bulk Indexor, you can also create your own non-global indexors and use the
// Index functions of that
//
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func IndexBulk(index string, _type string, id string, date *time.Time, data interface{}) error {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
if bulkIndexor == nil {
if GlobalBulkIndexor == nil {
panic("Must have Global Bulk Indexor to use this Func")
}
by, err := IndexBulkBytes(index, _type, id, "", date, data)
if err != nil {
return err
}
GlobalBulkIndexor.bulkChannel <- by
return nil
}

// The index bulk API adds or updates a typed JSON document to a specific index, making it searchable.
// it operates by buffering requests, and ocassionally flushing to elasticsearch.
//
// This uses the one Global Bulk Indexor, you can also create your own non-global indexors and use the
// IndexTtl functions of that
//
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func IndexBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}) error {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
if GlobalBulkIndexor == nil {
panic("Must have Global Bulk Indexor to use this Func")
}
by, err := IndexBulkBytes(index, _type, id, date, data)
by, err := IndexBulkBytes(index, _type, id, ttl, date, data)
if err != nil {
return err
}
bulkIndexor.bulkChannel <- by
GlobalBulkIndexor.bulkChannel <- by
return nil
}
55 changes: 42 additions & 13 deletions core/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,68 @@ func init() {
}
func TestBulk(t *testing.T) {
InitTests(true)
indexor := NewBulkIndexor(10)
indexor := NewBulkIndexor(3)
indexor.BulkSendor = func(buf *bytes.Buffer) error {
messageSets += 1
totalBytesSent += buf.Len()
buffers = append(buffers, buf)
log.Println(string(buf.Bytes()))
u.Debug(string(buf.Bytes()))
return BulkSend(buf)
}
done := make(chan bool)
indexor.Run(done)

date := time.Unix(1257894000, 0)
data := map[string]interface{}{"name": "smurfs", "age": 22, "date": time.Unix(1257894000, 0)}
err := indexor.Index("users", "user", "1", &date, data)
err := indexor.Index("users", "user", "1", "", &date, data)

WaitFor(func() bool {
return len(buffers) > 0
}, 5)
// part of request is url, so lets factor that in
//totalBytesSent = totalBytesSent - len(*eshost)
Assert(len(buffers) == 1, t, "Should have sent one operation but was %d", len(buffers))
Assert(BulkErrorCt == 0 && err == nil, t, "Should not have any errors %v", err)
Assert(totalBytesSent == 145, t, "Should have sent 135 bytes but was %v", totalBytesSent)
u.Assert(len(buffers) == 1, t, "Should have sent one operation but was %d", len(buffers))
u.Assert(BulkErrorCt == 0 && err == nil, t, "Should not have any errors %v", err)
u.Assert(totalBytesSent == 145, t, "Should have sent 135 bytes but was %v", totalBytesSent)

err = indexor.Index("users", "user", "2", nil, data)
err = indexor.Index("users", "user", "2", "", nil, data)

WaitFor(func() bool {
return len(buffers) > 1
}, 5)
totalBytesSent = totalBytesSent - len(*eshost)
Assert(len(buffers) == 2, t, "Should have nil error, and another buffer")
u.Assert(len(buffers) == 2, t, "Should have nil error, and another buffer")

u.Assert(BulkErrorCt == 0 && err == nil, t, "Should not have any errors")
u.Assert(u.CloseInt(totalBytesSent, 257), t, "Should have sent 257 bytes but was %v", totalBytesSent)

}
func TestBulkSmallBatch(t *testing.T) {
InitTests(true)

done := make(chan bool)

date := time.Unix(1257894000, 0)
data := map[string]interface{}{"name": "smurfs", "age": 22, "date": time.Unix(1257894000, 0)}

// Now tests small batches
indexorsm := NewBulkIndexor(1)
indexorsm.BufferDelayMax = 100 * time.Millisecond
indexorsm.BulkMaxDocs = 2
messageSets = 0
indexorsm.BulkSendor = func(buf *bytes.Buffer) error {
messageSets += 1
return BulkSend(buf)
}
indexorsm.Run(done)
<-time.After(time.Millisecond * 20)

indexorsm.Index("users", "user", "2", "", &date, data)
indexorsm.Index("users", "user", "3", "", &date, data)
indexorsm.Index("users", "user", "4", "", &date, data)
<-time.After(time.Millisecond * 200)
Assert(messageSets == 2, t, "Should have sent 2 message sets %d", messageSets)

Assert(BulkErrorCt == 0 && err == nil, t, "Should not have any errors")
Assert(u.CloseInt(totalBytesSent, 257), t, "Should have sent 257 bytes but was %v", totalBytesSent)
}

func TestBulkErrors(t *testing.T) {
Expand All @@ -82,7 +110,7 @@ func TestBulkErrors(t *testing.T) {
for i := 0; i < 20; i++ {
date := time.Unix(1257894000, 0)
data := map[string]interface{}{"name": "smurfs", "age": 22, "date": time.Unix(1257894000, 0)}
indexor.Index("users", "user", strconv.Itoa(i), &date, data)
indexor.Index("users", "user", strconv.Itoa(i), "", &date, data)
}
}()
for errBuf := range indexor.ErrorChannel {
Expand All @@ -91,6 +119,7 @@ func TestBulkErrors(t *testing.T) {
break
}
u.Assert(errorCt > 0, t, "ErrorCt should be > 0 %d", errorCt)

}

/*
Expand All @@ -106,7 +135,7 @@ func BenchmarkBulkSend(b *testing.B) {
b.StartTimer()
totalBytes := 0
sets := 0
bulkIndexor.BulkSendor = func(buf *bytes.Buffer) error {
GlobalBulkIndexor.BulkSendor = func(buf *bytes.Buffer) error {
totalBytes += buf.Len()
sets += 1
//log.Println("got bulk")
Expand Down Expand Up @@ -142,7 +171,7 @@ func BenchmarkBulkSendBytes(b *testing.B) {
b.StartTimer()
totalBytes := 0
sets := 0
bulkIndexor.BulkSendor = func(buf *bytes.Buffer) error {
GlobalBulkIndexor.BulkSendor = func(buf *bytes.Buffer) error {
totalBytes += buf.Len()
sets += 1
return BulkSend(buf)
Expand Down
Loading