Skip to content

Commit

Permalink
Check column name with rules while routing. (#152)
Browse files Browse the repository at this point in the history
* Check column name with rules while routing.

* Fix tests
  • Loading branch information
reshke authored Jun 16, 2023
1 parent e1a1b07 commit 015ccad
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
5 changes: 4 additions & 1 deletion router/pkg/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ func Frontend(qr qrouter.QueryRouter, cl client.RouterClient, cmngr rulerouter.P
_ = cl.ReplyDebugNoticef("process frontend for route %s %s", cl.Usr(), cl.DB())
rst := rulerouter.NewRelayState(qr, cl, cmngr, rcfg)

defer rst.Close()

var msg pgproto3.FrontendMessage
var err error

Expand All @@ -366,10 +368,11 @@ func Frontend(qr qrouter.QueryRouter, cl client.RouterClient, cmngr rulerouter.P
case io.ErrUnexpectedEOF:
fallthrough
case io.EOF:
return rst.Close()
return nil
// ok
default:
spqrlog.Logger.Printf(spqrlog.DEBUG5, "client %p iter done with error: %v", rst.Client(), err)
rst.Client().ReplyErrMsg(err.Error())
}
}
}
Expand Down
24 changes: 20 additions & 4 deletions router/pkg/qrouter/proxy_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ func (qr *ProxyQrouter) DeparseExprShardingEntries(expr *pgquery.Node, meta *Rou
// pure table column ref
return "", colnames[0], nil
case 2:
//
meta.rels[meta.tableAliases[colnames[0]]] = append(meta.rels[meta.tableAliases[colnames[0]]],
colnames[1])

// check that column matches sharding rule
return colnames[0], colnames[1], nil
default:
return "", "", ComplexQuery
Expand Down Expand Up @@ -219,6 +216,24 @@ func (qr *ProxyQrouter) routeByClause(ctx context.Context, expr *pgquery.Node, m

spqrlog.Logger.Printf(spqrlog.DEBUG5, "deparsed columns references %+v", colname)

if rls, err := qr.qdb.ListShardingRules(ctx); err != nil {
return err
} else {
ok := false
for i := range rls {
for _, c := range rls[i].Entries {
if c.Column == colname {
ok = true
break
}
}
}
spqrlog.Logger.Printf(spqrlog.DEBUG5, "skip column %v: no rule mathing", colname)
if !ok {
return nil
}
}

if resolvedRelation, ok := meta.tableAliases[alias]; ok {
// TBD: postpone routing from here to root of parsing tree

Expand Down Expand Up @@ -517,6 +532,7 @@ func (qr *ProxyQrouter) Route(ctx context.Context, parsedStmt *pgquery.ParseResu
spqrlog.Logger.Printf(spqrlog.DEBUG1, "Temporarily skip the route error: %v", route_err)
continue
}
spqrlog.Logger.Printf(spqrlog.DEBUG5, "calculated route %+v for table/cols %v %+v", currroute, tname, cols)
if route == nil {
route = currroute
} else {
Expand Down
6 changes: 4 additions & 2 deletions router/pkg/qrouter/qrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/pg-sharding/spqr/pkg/meta"
"github.com/pg-sharding/spqr/pkg/spqrlog"

pgquery "github.com/pganalyze/pg_query_go/v4"
"github.com/pkg/errors"
Expand All @@ -20,13 +21,14 @@ type ShardRoute interface {
}

func combine(sh1, sh2 ShardRoute) ShardRoute {
spqrlog.Logger.Printf(spqrlog.DEBUG5, "combine route %+v with %+v", sh1, sh2)
switch shq1 := sh1.(type) {
case *MultiMatchRoute:
return &MultiMatchRoute{}
return sh2
case *DataShardRoute:
switch shq2 := sh2.(type) {
case *MultiMatchRoute:
return &MultiMatchRoute{}
return sh1
case *DataShardRoute:
if shq2.Shkey.Name == shq1.Shkey.Name {
return sh1
Expand Down

0 comments on commit 015ccad

Please sign in to comment.