Skip to content
This repository has been archived by the owner on Feb 15, 2023. It is now read-only.

Commit

Permalink
go livesql: support marshaling custom type filter
Browse files Browse the repository at this point in the history
  • Loading branch information
changpingc committed Sep 14, 2018
1 parent d24d249 commit 810bd16
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 13 deletions.
10 changes: 5 additions & 5 deletions livesql/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (t *dbTracker) processBinlog(update *update) {
}
}

func (t *dbTracker) registerDependency(ctx context.Context, table string, tester sqlgen.Tester, filter sqlgen.Filter) error {
func (t *dbTracker) registerDependency(ctx context.Context, schema *sqlgen.Schema, table string, tester sqlgen.Tester, filter sqlgen.Filter) error {
r := &dbResource{
table: table,
tester: tester,
Expand All @@ -86,7 +86,7 @@ func (t *dbTracker) registerDependency(ctx context.Context, table string, tester
t.remove(r)
})

proto, err := filterToProto(table, filter)
proto, err := filterToProto(schema, table, filter)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (ldb *LiveDB) query(ctx context.Context, query *sqlgen.BaseSelectQuery) ([]
// Register the dependency before we do the query to not miss any updates
// between querying and registering.
// Do not fail the query if this step fails.
_ = ldb.tracker.registerDependency(ctx, query.Table.Name, tester, query.Filter)
_ = ldb.tracker.registerDependency(ctx, ldb.Schema, query.Table.Name, tester, query.Filter)

// Perform the query.
// XXX: This will build the SQL string again... :(
Expand Down Expand Up @@ -211,7 +211,7 @@ func (ldb *LiveDB) Close() error {
}

func (ldb *LiveDB) AddDependency(ctx context.Context, proto *thunderpb.SQLFilter) error {
table, filter, err := filterFromProto(proto)
table, filter, err := filterFromProto(ldb.Schema, proto)
if err != nil {
return err
}
Expand All @@ -221,7 +221,7 @@ func (ldb *LiveDB) AddDependency(ctx context.Context, proto *thunderpb.SQLFilter
return err
}

if err := ldb.tracker.registerDependency(ctx, table, tester, filter); err != nil {
if err := ldb.tracker.registerDependency(ctx, ldb.Schema, table, tester, filter); err != nil {
return err
}
return nil
Expand Down
71 changes: 63 additions & 8 deletions livesql/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@ import (
"reflect"
"time"

"github.com/samsarahq/thunder/internal/fields"
"github.com/samsarahq/thunder/sqlgen"
"github.com/samsarahq/thunder/thunderpb"
)

// valueToField must support all valid values for driver.Value:
// int64
// float64
// bool
// []byte
// string
// time.Time
// nil - for NULL values
func valueToField(value interface{}) (*thunderpb.Field, error) {
switch column := sqlgen.MakeCanonical(value).(type) {
case nil:
Expand Down Expand Up @@ -55,26 +64,72 @@ func fieldToValue(field *thunderpb.Field) (interface{}, error) {
}
}

func filterToProto(table string, filter sqlgen.Filter) (*thunderpb.SQLFilter, error) {
func filterToProto(schema *sqlgen.Schema, tableName string, filter sqlgen.Filter) (*thunderpb.SQLFilter, error) {
table, ok := schema.ByName[tableName]
if !ok {
return nil, fmt.Errorf("table %s not found in schema", tableName)
}

fields := make(map[string]*thunderpb.Field, len(filter))
for col, val := range filter {
for columnName, val := range filter {
column, ok := table.ColumnsByName[columnName]
if !ok {
return nil, fmt.Errorf("column %s not found in table %s", columnName, tableName)
}

val, err := column.Descriptor.Valuer(reflect.ValueOf(val)).Value()
if err != nil {
return nil, err
}

field, err := valueToField(val)
if err != nil {
return nil, err
}
fields[col] = field
fields[columnName] = field
}
return &thunderpb.SQLFilter{Table: table, Fields: fields}, nil
return &thunderpb.SQLFilter{Table: tableName, Fields: fields}, nil
}

func filterFromProto(proto *thunderpb.SQLFilter) (string, sqlgen.Filter, error) {
func filterFromProto(schema *sqlgen.Schema, proto *thunderpb.SQLFilter) (string, sqlgen.Filter, error) {
tableName := proto.Table
table, ok := schema.ByName[tableName]
if !ok {
return "", nil, fmt.Errorf("table %s not found in schema", tableName)
}

scanners := table.Scanners.Get().([]interface{})
defer table.Scanners.Put(scanners)

filter := make(sqlgen.Filter, len(proto.Fields))
for col, field := range proto.Fields {
for columnName, field := range proto.Fields {
column, ok := table.ColumnsByName[columnName]
if !ok {
return "", nil, fmt.Errorf("column %s not found in table %s", columnName, tableName)
}

val, err := fieldToValue(field)
if err != nil {
return "", nil, err
}
filter[col] = val

// Scan into a pointer.
ptr := reflect.New(table.Type.Field(column.Order).Type)
target := ptr

// If field type is a pointer, deference pointer pointer.
if ptr.Elem().Kind() == reflect.Ptr {
target = ptr.Elem()
}

scanner := scanners[column.Order].(*fields.Scanner)
scanner.Target(target)

if err := scanner.Scan(val); err != nil {
return "", nil, err
}

filter[columnName] = ptr.Elem().Interface()
}
return proto.Table, filter, nil
return tableName, filter, nil
}
43 changes: 43 additions & 0 deletions livesql/marshal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package livesql

import (
"testing"

"github.com/samsarahq/thunder/internal/testfixtures"
"github.com/samsarahq/thunder/sqlgen"
)

func TestMarshalCustomType(t *testing.T) {
schema := sqlgen.NewSchema()
type user struct {
Id testfixtures.CustomType `sql:",primary"`
Name string
}
schema.MustRegisterType("users", sqlgen.UniqueId, user{})

var foo testfixtures.CustomType
copy(foo[:], []byte("foo"))

original := sqlgen.Filter{
"id": foo,
"name": "bar",
}
proto, err := filterToProto(schema, "users", original)
if err != nil {
t.Fatal(err)
}

table, filter, err := filterFromProto(schema, proto)
if err != nil {
t.Fatal(err)
}
if table != "users" {
t.Fatalf("table name %s is not users", table)
}
if filter["id"] != foo {
t.Fatalf("filter[id] does not match: %T", filter["id"])
}
if filter["name"] != "bar" {
t.Fatal("filter[name] does not match")
}
}

0 comments on commit 810bd16

Please sign in to comment.