Skip to content

Commit

Permalink
chore: optimize single-record blocking write
Browse files Browse the repository at this point in the history
  • Loading branch information
pabigot committed Jan 4, 2022
1 parent a8363f1 commit 0cf22fe
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 11 deletions.
30 changes: 19 additions & 11 deletions api/writeAPIBlocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ import (
// }
type WriteAPIBlocking interface {
// WriteRecord writes line protocol record(s) into bucket.
// WriteRecord writes without implicit batching. Batch is created from given number of records
// WriteRecord writes without implicit batching. Batch is created from given number of records.
// Individual arguments can also be batches (multiple records separated by newline).
// Non-blocking alternative is available in the WriteAPI interface
WriteRecord(ctx context.Context, line ...string) error
// WritePoint data point into bucket.
Expand Down Expand Up @@ -80,18 +81,25 @@ func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
}

func (w *writeAPIBlocking) WriteRecord(ctx context.Context, line ...string) error {
if len(line) > 0 {
var sb strings.Builder
for _, line := range line {
b := []byte(line)
b = append(b, 0xa)
if _, err := sb.Write(b); err != nil {
return err
}
if len(line) == 0 {
return nil
}
if len(line) == 1 {
ln := line[0]
if ln[len(ln)-1] != '\n' {
ln += "\n"
}
return w.write(ctx, sb.String())
return w.write(ctx, ln)
}
return nil
var sb strings.Builder
for _, line := range line {
b := []byte(line)
b = append(b, 0xa)
if _, err := sb.Write(b); err != nil {
return err
}
}
return w.write(ctx, sb.String())
}

func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point) error {
Expand Down
15 changes: 15 additions & 0 deletions api/writeAPIBlocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"net"
"net/http"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -57,6 +58,20 @@ func TestWriteRecord(t *testing.T) {
require.Equal(t, "invalid: data", err.Error())
}

func TestWriteRecordBatch(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
lines := test.GenRecords(10)
batch := strings.Join(lines, "\n")
err := writeAPI.WriteRecord(context.Background(), batch)
require.Nil(t, err)
require.Len(t, service.Lines(), 10)
for i, l := range lines {
assert.Equal(t, l, service.Lines()[i])
}
service.Close()
}

func TestWriteParallel(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
Expand Down

0 comments on commit 0cf22fe

Please sign in to comment.