Skip to content
This repository has been archived by the owner on Feb 15, 2023. It is now read-only.

Commit

Permalink
go thunder/livesql: serialize and restore sqlgen.Tester
Browse files Browse the repository at this point in the history
This commit adds conversion between thunderpb.SQLFilter
and sqlgen.Tester. It takes each primitive typed valued in filter
and casts them into canonical types so that equality check in
tester still works after we convert serialized filter back into tester.
  • Loading branch information
changpingc committed Sep 12, 2018
1 parent a22b4d3 commit 7547ade
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 5 deletions.
34 changes: 30 additions & 4 deletions livesql/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/samsarahq/thunder/internal"
"github.com/samsarahq/thunder/reactive"
"github.com/samsarahq/thunder/sqlgen"
"github.com/samsarahq/thunder/thunderpb"
)

// dbResource tracks changes to a specific table matching a filter
Expand Down Expand Up @@ -75,7 +76,7 @@ func (t *dbTracker) processBinlog(update *update) {
}
}

func (t *dbTracker) registerDependency(ctx context.Context, table string, tester sqlgen.Tester) {
func (t *dbTracker) registerDependency(ctx context.Context, table string, tester sqlgen.Tester, filter sqlgen.Filter) error {
r := &dbResource{
table: table,
tester: tester,
Expand All @@ -84,9 +85,16 @@ func (t *dbTracker) registerDependency(ctx context.Context, table string, tester
r.resource.Cleanup(func() {
t.remove(r)
})
reactive.AddDependency(ctx, r.resource)

proto, err := filterToProto(table, filter)
if err != nil {
return err
}

reactive.AddDependency(ctx, r.resource, proto)

t.add(r)
return nil
}

// LiveDB is a SQL client that supports live updating queries.
Expand Down Expand Up @@ -120,7 +128,6 @@ func (ldb *LiveDB) query(ctx context.Context, query *sqlgen.BaseSelectQuery) ([]
if !reactive.HasRerunner(ctx) || ldb.HasTx(ctx) {
return ldb.DB.BaseQuery(ctx, query)
}

selectQuery, err := query.MakeSelectQuery()
if err != nil {
return nil, err
Expand All @@ -141,7 +148,9 @@ func (ldb *LiveDB) query(ctx context.Context, query *sqlgen.BaseSelectQuery) ([]

// Register the dependency before we do the query to not miss any updates
// between querying and registering.
ldb.tracker.registerDependency(ctx, query.Table.Name, tester)
if err := ldb.tracker.registerDependency(ctx, query.Table.Name, tester, query.Filter); err != nil {
return nil, err
}

// Perform the query.
// XXX: This will build the SQL string again... :(
Expand Down Expand Up @@ -201,3 +210,20 @@ func (ldb *LiveDB) QueryRow(ctx context.Context, result interface{}, filter sqlg
func (ldb *LiveDB) Close() error {
return ldb.Conn.Close()
}

func (ldb *LiveDB) AddDependency(ctx context.Context, proto *thunderpb.SQLFilter) error {
table, filter, err := filterFromProto(proto)
if err != nil {
return err
}

tester, err := ldb.Schema.MakeTester(table, filter)
if err != nil {
return err
}

if err := ldb.tracker.registerDependency(ctx, table, tester, filter); err != nil {
return err
}
return nil
}
80 changes: 80 additions & 0 deletions livesql/marshal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package livesql

import (
"fmt"
"reflect"
"time"

"github.com/samsarahq/thunder/sqlgen"
"github.com/samsarahq/thunder/thunderpb"
)

func valueToField(value interface{}) (*thunderpb.Field, error) {
switch column := sqlgen.MakeCanonical(value).(type) {
case nil:
return &thunderpb.Field{Kind: thunderpb.FieldKind_Null}, nil
case bool:
return &thunderpb.Field{Kind: thunderpb.FieldKind_Bool, Bool: column}, nil
case int64:
return &thunderpb.Field{Kind: thunderpb.FieldKind_Int, Int: column}, nil
case uint64:
return &thunderpb.Field{Kind: thunderpb.FieldKind_Uint, Uint: column}, nil
case string:
return &thunderpb.Field{Kind: thunderpb.FieldKind_String, String_: column}, nil
case []byte:
return &thunderpb.Field{Kind: thunderpb.FieldKind_Bytes, Bytes: column}, nil
case float64:
return &thunderpb.Field{Kind: thunderpb.FieldKind_Float64, Float64: column}, nil
case time.Time:
return &thunderpb.Field{Kind: thunderpb.FieldKind_Time, Time: column}, nil
default:
return nil, fmt.Errorf("unknown type %s", reflect.TypeOf(column))
}
}

func fieldToValue(field *thunderpb.Field) (interface{}, error) {
switch field.Kind {
case thunderpb.FieldKind_Null:
return nil, nil
case thunderpb.FieldKind_Bool:
return field.Bool, nil
case thunderpb.FieldKind_Int:
return field.Int, nil
case thunderpb.FieldKind_Uint:
return field.Uint, nil
case thunderpb.FieldKind_String:
return field.String, nil
case thunderpb.FieldKind_Bytes:
return field.Bytes, nil
case thunderpb.FieldKind_Float64:
return field.Float64, nil
case thunderpb.FieldKind_Time:
return field.Time, nil
default:
return nil, fmt.Errorf("unknown kind %s", field.Kind.String())
}
}

func filterToProto(table string, filter sqlgen.Filter) (*thunderpb.SQLFilter, error) {
fields := make(map[string]*thunderpb.Field, len(filter))
for col, val := range filter {
field, err := valueToField(val)
if err != nil {
return nil, err
}
fields[col] = field
}
return &thunderpb.SQLFilter{Table: table, Fields: fields}, nil
}

func filterFromProto(proto *thunderpb.SQLFilter) (string, sqlgen.Filter, error) {
filter := make(sqlgen.Filter, len(proto.Fields))
for col, field := range proto.Fields {
val, err := fieldToValue(field)
if err != nil {
return "", nil, err
}
filter[col] = val
}
return proto.Table, filter, nil
}
41 changes: 40 additions & 1 deletion sqlgen/reflect.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (t *tester) Test(row interface{}) bool {
// coerces some pointer types to make filters more idiomatic
expected := coerce(reflect.ValueOf(t.values[i]))
value := coerce(struc.FieldByIndex(column.Index))
if expected != value {
if MakeCanonical(expected) != MakeCanonical(value) {
return false
}
}
Expand Down Expand Up @@ -680,3 +680,42 @@ func (t *Table) extractRow(row interface{}) Filter {

return f
}

func MakeCanonical(v interface{}) interface{} {
switch v := v.(type) {
case bool:
return bool(v)
case int8:
return int64(v)
case int16:
return int64(v)
case int32:
return int64(v)
case int64:
return int64(v)
case int:
return int64(v)
case uint8:
return uint64(v)
case uint16:
return uint64(v)
case uint32:
return uint64(v)
case uint64:
return uint64(v)
case uint:
return uint64(v)
case string:
return string(v)
case []byte:
return []byte(v)
case float32:
return float64(v)
case float64:
return float64(v)
case time.Time:
return time.Time(v)
default:
return v
}
}

0 comments on commit 7547ade

Please sign in to comment.