Skip to content

Commit

Permalink
add QueryBinary, an alloc-free way to read all rows into a buffer
Browse files Browse the repository at this point in the history
WIP; goal is alloc-free reads of a query into a Go-provided buffer.
Go code can then parse the simple binary format and alloc if needed
(doing its own cache lookups, including alloc-free m[string([]byte)]
lookups, and returning existing Views if data is unmodified)

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
  • Loading branch information
bradfitz committed Mar 26, 2023
1 parent 8a7a943 commit d10e7f3
Show file tree
Hide file tree
Showing 8 changed files with 437 additions and 3 deletions.
185 changes: 185 additions & 0 deletions binary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright (c) 2023 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package sqlite

import (
"context"
"encoding/binary"
"errors"
"fmt"
"math"
"reflect"
"sync"

"github.com/tailscale/sqlite/sqliteh"
"golang.org/x/sys/cpu"
)

type driverConnRawCall struct {
f func(driverConn any) error

// results
dc *conn
ok bool
}

var driverConnRawCallPool = &sync.Pool{
New: func() any {
c := new(driverConnRawCall)
c.f = func(driverConn any) error {
c.dc, c.ok = driverConn.(*conn)
return nil
}
return c
},
}

func getDriverConn(sc SQLConn) (dc *conn, ok bool) {
c := driverConnRawCallPool.Get().(*driverConnRawCall)
defer driverConnRawCallPool.Put(c)
err := sc.Raw(c.f)
if err != nil {
return nil, false
}
return c.dc, c.ok
}

func QueryBinary(ctx context.Context, sqlconn SQLConn, optScratch []byte, query string, args ...any) (BinaryResults, error) {
c, ok := getDriverConn(sqlconn)
if !ok {
return nil, errors.New("sqlconn is not of expected type")
}
st, err := c.prepare(ctx, query, IsPersist(ctx))
if err != nil {
return nil, err
}
buf := optScratch
if len(buf) == 0 {
buf = make([]byte, 128)
}
for {
st.stmt.ResetAndClear()

// Bind args.
for colIdx, a := range args {
rv := reflect.ValueOf(a)
switch rv.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
if err := st.stmt.BindInt64(colIdx+1, rv.Int()); err != nil {
return nil, fmt.Errorf("binding col idx %d to %T (%v): %w", colIdx, a, rv.Int(), err)
}
default:
// TODO(bradfitz): more types, at least strings for stable IDs.
return nil, fmt.Errorf("unsupported arg type %T", a)
}
}

n, err := st.stmt.StepAllBinary(buf)
if err == nil {
return BinaryResults(buf[:n]), nil
}
if e, ok := err.(sqliteh.BufferSizeTooSmallError); ok {
buf = make([]byte, e.EncodedSize)
continue
}
return nil, err
}
}

// BinaryResults is the result of QueryBinary.
//
// You should not depend on its specific format and parse it via its methods
// instead.
type BinaryResults []byte

type BinaryToken struct {
StartRow bool
EndRow bool
EndRows bool
IsInt bool // if so, use Int() method
IsFloat bool // if so, use Float() method
IsNull bool
IsBytes bool
Error bool

x uint64
Bytes []byte
}

func (t *BinaryToken) String() string {
switch {
case t.StartRow:
return "start-row"
case t.EndRow:
return "end-row"
case t.EndRows:
return "end-rows"
case t.IsNull:
return "null"
case t.IsInt:
return fmt.Sprintf("int: %v", t.Int())
case t.IsFloat:
return fmt.Sprintf("float: %g", t.Float())
case t.IsBytes:
return fmt.Sprintf("bytes: %q", t.Bytes)
case t.Error:
return "error"
default:
return "unknown"
}
}

func (t *BinaryToken) Int() int64 { return int64(t.x) }
func (t *BinaryToken) Float() float64 { return math.Float64frombits(t.x) }

func (r *BinaryResults) Next() BinaryToken {
if len(*r) == 0 {
return BinaryToken{Error: true}
}
first := (*r)[0]
*r = (*r)[1:]
switch first {
default:
return BinaryToken{Error: true}
case '(':
return BinaryToken{StartRow: true}
case ')':
return BinaryToken{EndRow: true}
case 'E':
return BinaryToken{EndRows: true}
case 'n':
return BinaryToken{IsNull: true}
case 'i', 'f':
if len(*r) < 8 {
return BinaryToken{Error: true}
}
t := BinaryToken{IsInt: first == 'i', IsFloat: first == 'f'}
if cpu.IsBigEndian {
t.x = binary.BigEndian.Uint64((*r)[:8])
} else {
t.x = binary.LittleEndian.Uint64((*r)[:8])
}
*r = (*r)[8:]
return t
case 'b':
if len(*r) < 8 {
return BinaryToken{Error: true}
}
t := BinaryToken{IsBytes: true}
var n int64
if cpu.IsBigEndian {
n = int64(binary.BigEndian.Uint64((*r)[:8]))
} else {
n = int64(binary.LittleEndian.Uint64((*r)[:8]))
}
*r = (*r)[8:]
if int64(len(*r)) < n {
return BinaryToken{Error: true}
}
t.Bytes = (*r)[:n]
*r = (*r)[n:]
return t
}
}
110 changes: 110 additions & 0 deletions binary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (c) 2023 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package sqlite

import (
"context"
"math"
"reflect"
"testing"

"github.com/google/go-cmp/cmp"
)

func TestQueryBinary(t *testing.T) {
ctx := WithPersist(context.Background())
db := openTestDB(t)
exec(t, db, "CREATE TABLE t (id INTEGER PRIMARY KEY, f REAL, txt TEXT, blb BLOB)")
exec(t, db, "INSERT INTO t VALUES (?, ?, ?, ?)", math.MinInt64, 1.0, "text-a", "blob-a")
exec(t, db, "INSERT INTO t VALUES (?, ?, ?, ?)", -1, -1.0, "text-b", "blob-b")
exec(t, db, "INSERT INTO t VALUES (?, ?, ?, ?)", 0, 0, "text-c", "blob-c")
exec(t, db, "INSERT INTO t VALUES (?, ?, ?, ?)", 20, 2, "text-d", "blob-d")
exec(t, db, "INSERT INTO t VALUES (?, ?, ?, ?)", math.MaxInt64, nil, "text-e", "blob-e")
exec(t, db, "INSERT INTO t VALUES (?, ?, ?, ?)", 42, 0.25, "text-f", nil)
exec(t, db, "INSERT INTO t VALUES (?, ?, ?, ?)", 43, 1.75, "text-g", nil)

conn, err := db.Conn(ctx)
if err != nil {
t.Fatal(err)
}

buf, err := QueryBinary(ctx, conn, make([]byte, 100), "SELECT * FROM t ORDER BY id")
if err != nil {
t.Fatal(err)
}
t.Logf("Got %d bytes: %q", len(buf), buf)

var got []string
iter := buf
for len(iter) > 0 {
t := iter.Next()
got = append(got, t.String())
if t.Error {
break
}
}
want := []string{
"start-row", "int: -9223372036854775808", "float: 1", "bytes: \"text-a\"", "bytes: \"blob-a\"", "end-row",
"start-row", "int: -1", "float: -1", "bytes: \"text-b\"", "bytes: \"blob-b\"", "end-row",
"start-row", "int: 0", "float: 0", "bytes: \"text-c\"", "bytes: \"blob-c\"", "end-row",
"start-row", "int: 20", "float: 2", "bytes: \"text-d\"", "bytes: \"blob-d\"", "end-row",
"start-row", "int: 42", "float: 0.25", "bytes: \"text-f\"", "null", "end-row",
"start-row", "int: 43", "float: 1.75", "bytes: \"text-g\"", "null", "end-row",
"start-row", "int: 9223372036854775807", "null", "bytes: \"text-e\"", "bytes: \"blob-e\"", "end-row",
"end-rows",
}
if !reflect.DeepEqual(got, want) {
t.Errorf("wrong results\n got: %q\nwant: %q\n\ndiff:\n%s", got, want, cmp.Diff(want, got))
}

allocs := int(testing.AllocsPerRun(10000, func() {
_, err := QueryBinary(ctx, conn, buf, "SELECT * FROM t")
if err != nil {
t.Fatal(err)
}
}))
const maxAllocs = 5 // as of Go 1.20
if allocs > maxAllocs {
t.Errorf("allocs = %v; want max %v", allocs, maxAllocs)
}
}

func BenchmarkQueryBinaryParallel(b *testing.B) {
ctx := WithPersist(context.Background())
db := openTestDB(b)
exec(b, db, "CREATE TABLE t (id INTEGER PRIMARY KEY, f REAL, txt TEXT, blb BLOB)")
exec(b, db, "INSERT INTO t VALUES (?, ?, ?, ?)", 42, 0.25, "text-f", "some big big big big blob so big like so many bytes even")

b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
conn, err := db.Conn(ctx)
if err != nil {
b.Error(err)
return
}

var buf = make([]byte, 250)

for pb.Next() {
res, err := QueryBinary(ctx, conn, buf, "SELECT id, f, txt, blb FROM t WHERE id=?", 42)
if err != nil {
b.Error(err)
return
}
t := res.Next()
if !t.StartRow {
b.Errorf("didn't get start row; got %v", t)
return
}
t = res.Next()
if t.Int() != 42 {
b.Errorf("got %v; want 42", t)
return
}
}
})

}
19 changes: 19 additions & 0 deletions cgosqlite/cgosqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ package cgosqlite
// #include "cgosqlite.h"
import "C"
import (
"errors"
"sync"
"time"
"unsafe"
Expand Down Expand Up @@ -120,6 +121,7 @@ type Stmt struct {
// used as scratch space when calling into cgo
rowid, changes C.sqlite3_int64
duration C.int64_t
encodedSize C.int
}

// Open implements sqliteh.OpenFunc.
Expand Down Expand Up @@ -416,6 +418,23 @@ func (stmt *Stmt) ColumnDeclType(col int) string {
return res
}

func (stmt *Stmt) StepAllBinary(dstBuf []byte) (n int, err error) {
if len(dstBuf) == 0 {
return 0, errors.New("zero-length buffer to StepAllBinary")
}
ret := C.ts_sqlite_step_all(stmt.stmt.int(), (*C.char)(unsafe.Pointer(&dstBuf[0])), C.int(len(dstBuf)), &stmt.encodedSize)

if int(stmt.encodedSize) > len(dstBuf) {
return 0, sqliteh.BufferSizeTooSmallError{
EncodedSize: int(stmt.encodedSize),
}
}
if err := errCode(ret); err != nil {
return 0, err
}
return int(stmt.encodedSize), nil
}

var emptyCStr = C.CString("")

func errCode(code C.int) error { return sqliteh.CodeAsError(sqliteh.Code(code)) }
Expand Down
Loading

0 comments on commit d10e7f3

Please sign in to comment.