Skip to content

Commit

Permalink
Change dataspace to keyspace, drop upper bound from key range definition
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Jan 22, 2024
1 parent d56f058 commit abdd6aa
Show file tree
Hide file tree
Showing 46 changed files with 1,736 additions and 2,571 deletions.
39 changes: 21 additions & 18 deletions balancer/pkg/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,31 @@ type Coordinator struct {
balancerServiceClient routerproto.BalancerServiceClient
shardServiceClient routerproto.ShardServiceClient
keyRangeServiceClient routerproto.KeyRangeServiceClient
keySpaceServiceClient routerproto.KeySpaceServiceClient
operationServiceClient routerproto.OperationServiceClient
}

func (c *Coordinator) showKeyRanges() ([]*kr.KeyRange, error) {
respList, err := c.keyRangeServiceClient.ListKeyRange(context.Background(), &routerproto.ListKeyRangeRequest{})
if err != nil {
return nil, err
}

res := make([]*kr.KeyRange, 0, len(respList.KeyRangesInfo))
for _, keyRangeInfo := range respList.KeyRangesInfo {
keyRange := &kr.KeyRange{
LowerBound: []byte(keyRangeInfo.GetKeyRange().GetLowerBound()),
UpperBound: []byte(keyRangeInfo.GetKeyRange().GetUpperBound()),
ShardID: keyRangeInfo.GetShardId(),
ID: keyRangeInfo.GetKrid(),
}
// respList, err := c.keyRangeServiceClient.ListKeyRange(context.Background(), &routerproto.ListKeyRangeRequest{})
// if err != nil {
// return nil, err
// }

res = append(res, keyRange)
}
// res := make([]*kr.KeyRange, 0, len(respList.KeyRangesInfo))
// for _, keyRangeInfo := range respList.KeyRangesInfo {
// // ds, err := c
// // keyRange := kr.KeyRangeFromProto(keyRangeInfo, )
// // keyRange := &kr.KeyRange{
// // LowerBound: keyRangeInfo.GetKeyRange().GetLowerBound(),
// // ShardID: keyRangeInfo.GetShardId(),
// // ID: keyRangeInfo.GetKrid(),
// // }

// res = append(res, keyRange)
// }

return res, nil
// return res, nil
return nil, nil
}

func (c *Coordinator) Init(addr string, maxRetriesCount int) error {
Expand Down Expand Up @@ -134,7 +137,7 @@ func (c *Coordinator) initKeyRanges() (map[Shard][]KeyRange, error) {
if !ok {
res[shard] = []KeyRange{}
}
res[shard] = append(res[shard], KeyRange{left: kr.KeyRange.LowerBound, right: kr.KeyRange.UpperBound})
res[shard] = append(res[shard], KeyRange{left: string(kr.KeyRange.LowerBound[0])})
}

return res, nil
Expand Down Expand Up @@ -200,7 +203,7 @@ func (c *Coordinator) mergeKeyRanges(border *string) error {

func (c *Coordinator) moveKeyRange(rng KeyRange, shardTo Shard) error {
resp, err := c.keyRangeServiceClient.MoveKeyRange(context.Background(), &routerproto.MoveKeyRangeRequest{
KeyRange: &routerproto.KeyRangeInfo{KeyRange: &routerproto.KeyRange{LowerBound: rng.left, UpperBound: rng.right}},
KeyRange: &routerproto.KeyRangeInfo{KeyRange: &routerproto.KeyRange{}},
ToShardId: strconv.Itoa(shardTo.id),
})
if err != nil {
Expand Down
34 changes: 2 additions & 32 deletions cmd/mover/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/jackc/pgx/v5"
_ "github.com/lib/pq"
"github.com/pg-sharding/spqr/pkg/models/kr"
"github.com/pg-sharding/spqr/pkg/models/shrule"
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/qdb"
)
Expand Down Expand Up @@ -42,7 +41,7 @@ func (p *ProxyW) Write(bt []byte) (int, error) {
}

// TODO : unit tests
func moveData(ctx context.Context, from, to *pgx.Conn, keyRange kr.KeyRange, key *shrule.ShardingRule) error {
func moveData(ctx context.Context, from, to *pgx.Conn, keyRange kr.KeyRange, keyNextRange kr.KeyRange, ks ) error {
txFrom, err := from.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return err
Expand All @@ -65,28 +64,6 @@ func moveData(ctx context.Context, from, to *pgx.Conn, keyRange kr.KeyRange, key
}
}(txFrom)

rows, err := txFrom.Query(ctx, `
SELECT table_schema, table_name
FROM information_schema.columns
WHERE column_name=$1;
`, key.Entries()[0].Column)
if err != nil {
return err
}
var ress []MoveTableRes

for rows.Next() {
var curres MoveTableRes
err = rows.Scan(&curres.TableSchema, &curres.TableName)
if err != nil {
return err
}

ress = append(ress, curres)
}

rows.Close()

for _, v := range ress {
spqrlog.Zero.Debug().
Str("schema", v.TableSchema).
Expand Down Expand Up @@ -160,15 +137,8 @@ func main() {
//my_rule := shrule.NewShardingRule("r1", "fast", entrys)
//db.AddShardingRule(context.TODO(), shrule.ShardingRuleToDB(my_rule))

shRule, err := db.GetShardingRule(context.TODO(), *shkey)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return
}

if err := moveData(ctx,
connFrom, connTo, kr.KeyRange{LowerBound: []byte(*lb), UpperBound: []byte(*ub)},
shrule.ShardingRuleFromDB(shRule)); err != nil {
connFrom, connTo, kr.KeyRange{LowerBound: []byte(*lb)}); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
}
2 changes: 0 additions & 2 deletions coordinator/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,11 @@ func (app *App) ServeGrpc(wg *sync.WaitGroup) error {
krserv := provider.NewKeyRangeService(app.coordinator)
rrserv := provider.NewRouterService(app.coordinator)
toposerv := provider.NewTopologyService(app.coordinator)
shardingRulesServ := provider.NewShardingRulesServer(app.coordinator)
shardServ := provider.NewShardServer(app.coordinator)

protos.RegisterKeyRangeServiceServer(serv, krserv)
protos.RegisterRouterServiceServer(serv, rrserv)
protos.RegisterTopologyServiceServer(serv, toposerv)
protos.RegisterShardingRulesServiceServer(serv, shardingRulesServ)
protos.RegisterShardServiceServer(serv, shardServ)

httpAddr := config.CoordinatorConfig().HttpAddr
Expand Down
129 changes: 16 additions & 113 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,116 +438,11 @@ func (qc *qdbCoordinator) getAllListShardingRules(ctx context.Context) ([]*shrul
return shRules, nil
}

// TODO : unit tests
func (qc *qdbCoordinator) ListShardingRules(ctx context.Context, dataspace string) ([]*shrule.ShardingRule, error) {
rulesList, err := qc.db.ListShardingRules(ctx, dataspace)
if err != nil {
return nil, err
}

shRules := make([]*shrule.ShardingRule, 0, len(rulesList))
for _, rule := range rulesList {
if rule.DataspaceId == dataspace {
shRules = append(shRules, shrule.ShardingRuleFromDB(rule))
}
}

return shRules, nil
}

// TODO : unit tests
func (qc *qdbCoordinator) ListAllShardingRules(ctx context.Context) ([]*shrule.ShardingRule, error) {
rulesList, err := qc.db.ListAllShardingRules(ctx)
if err != nil {
return nil, err
}

shRules := make([]*shrule.ShardingRule, 0, len(rulesList))
for _, rule := range rulesList {
shRules = append(shRules, shrule.ShardingRuleFromDB(rule))
}

return shRules, nil
}

// TODO : unit tests
func (qc *qdbCoordinator) AddShardingRule(ctx context.Context, rule *shrule.ShardingRule) error {
// Store sharding rule to metadb.
if err := ops.AddShardingRuleWithChecks(ctx, qc.db, rule); err != nil {
return err
}

return qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewShardingRulesServiceClient(cc)
resp, err := cl.AddShardingRules(context.TODO(), &routerproto.AddShardingRuleRequest{
Rules: []*routerproto.ShardingRule{shrule.ShardingRuleToProto(rule)},
})
if err != nil {
return err
}

spqrlog.Zero.Debug().
Interface("response", resp).
Msg("add sharding rules response")
return nil
})
}

// TODO : unit tests
func (qc *qdbCoordinator) DropShardingRuleAll(ctx context.Context) ([]*shrule.ShardingRule, error) {
spqrlog.Zero.Debug().Msg("qdb coordinator dropping all sharding keys")

if err := qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewShardingRulesServiceClient(cc)
// TODO: support drop sharding rules all in grpc somehow
listResp, err := cl.ListShardingRules(context.TODO(), &routerproto.ListShardingRuleRequest{})
if err != nil {
return err
}

var ids []string
for _, v := range listResp.Rules {
ids = append(ids, v.Id)
}

spqrlog.Zero.Debug().
Interface("response", listResp).
Msg("list sharding rules response")

dropResp, err := cl.DropShardingRules(ctx, &routerproto.DropShardingRuleRequest{
Id: ids,
})

spqrlog.Zero.Debug().
Interface("response", dropResp).
Msg("drop sharding rules response")

return err
}); err != nil {
return nil, err
}

// Drop sharding rules from qdb.
rules, err := qc.db.DropShardingRuleAll(ctx)
if err != nil {
return nil, err
}

var ret []*shrule.ShardingRule

for _, v := range rules {
ret = append(ret, shrule.ShardingRuleFromDB(v))
}

return ret, nil
}

// TODO : unit tests
func (qc *qdbCoordinator) AddKeyRange(ctx context.Context, keyRange *kr.KeyRange) error {
// add key range to metadb
spqrlog.Zero.Debug().
Bytes("lower-bound", keyRange.LowerBound).
Bytes("upper-bound", keyRange.UpperBound).
Bytes("lower-bound", keyRange.OutFunc(0)).
Str("shard-id", keyRange.ShardID).
Str("key-range-id", keyRange.ID).
Msg("add key range")
Expand Down Expand Up @@ -580,10 +475,15 @@ func (qc *qdbCoordinator) ListKeyRanges(ctx context.Context, dataspace string) (
if err != nil {
return nil, err
}
ds, err := qc.db.GetDataspace(ctx, dataspace)
if err != nil {
return nil, err
}

keyr := make([]*kr.KeyRange, 0, len(keyRanges))
for _, keyRange := range keyRanges {
keyr = append(keyr, kr.KeyRangeFromDB(keyRange))

keyr = append(keyr, kr.KeyRangeFromDB(keyRange, ds.ColTypes))
}

return keyr, nil
Expand All @@ -598,7 +498,11 @@ func (qc *qdbCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRange,

keyr := make([]*kr.KeyRange, 0, len(keyRanges))
for _, keyRange := range keyRanges {
keyr = append(keyr, kr.KeyRangeFromDB(keyRange))
ds, err := qc.db.GetDataspace(ctx, keyRange.DataspaceId)
if err != nil {
return nil, err
}
keyr = append(keyr, kr.KeyRangeFromDB(keyRange, ds.ColTypes))
}

return keyr, nil
Expand All @@ -616,7 +520,9 @@ func (qc *qdbCoordinator) LockKeyRange(ctx context.Context, keyRangeID string) (
return nil, err
}

keyRange := kr.KeyRangeFromDB(keyRangeDB)
ds, err := qc.db.GetDataspace(ctx, keyRangeDB.DataspaceId)

keyRange := kr.KeyRangeFromDB(keyRangeDB, ds.ColTypes)

return keyRange, qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewKeyRangeServiceClient(cc)
Expand Down Expand Up @@ -689,21 +595,18 @@ func (qc *qdbCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) erro
krNew := kr.KeyRangeFromDB(
&qdb.KeyRange{
LowerBound: req.Bound,
UpperBound: krOld.UpperBound,
KeyRangeID: req.Krid,
ShardID: krOld.ShardID,
DataspaceId: krOld.DataspaceId,
},
)

spqrlog.Zero.Debug().
Bytes("lower-bound", krNew.LowerBound).
Bytes("upper-bound", krNew.UpperBound).
Bytes("lower-bound", krNew.OutFunc(0)).
Str("shard-id", krNew.ShardID).
Str("id", krNew.ID).
Msg("new key range")

krOld.UpperBound = req.Bound
if err := ops.ModifyKeyRangeWithChecks(ctx, qc.db, kr.KeyRangeFromDB(krOld)); err != nil {
return err
}
Expand Down
31 changes: 6 additions & 25 deletions coordinator/provider/keyranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ type CoordinatorService struct {

// TODO : unit tests
func (c *CoordinatorService) AddKeyRange(ctx context.Context, request *protos.AddKeyRangeRequest) (*protos.ModifyReply, error) {
err := c.impl.AddKeyRange(ctx, &kr.KeyRange{
LowerBound: []byte(request.KeyRangeInfo.KeyRange.LowerBound),
UpperBound: []byte(request.KeyRangeInfo.KeyRange.UpperBound),
ID: request.KeyRangeInfo.Krid,
ShardID: request.KeyRangeInfo.ShardId,
Dataspace: "default",
})
ds, err := c.impl.GetDataspace(ctx, request.KeyRangeInfo.DataspaceId)
if err != nil {
return nil, err
}

err = c.impl.AddKeyRange(ctx, kr.KeyRangeFromProto(request.KeyRangeInfo, ds.ColTypes))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -55,24 +54,6 @@ func (c *CoordinatorService) UnlockKeyRange(ctx context.Context, request *protos
return &protos.ModifyReply{}, nil
}

// TODO : unit tests
func (c *CoordinatorService) KeyRangeIDByBounds(ctx context.Context, keyRange *protos.KeyRange, dataspace string) (string, error) {
krsqb, err := c.impl.ListKeyRanges(ctx, dataspace)
if err != nil {
return "", err
}

// TODO: choose a key range without matching to exact bounds.
for _, krqb := range krsqb {
if string(krqb.LowerBound) == keyRange.GetLowerBound() &&
string(krqb.UpperBound) == keyRange.GetUpperBound() {
return krqb.ID, nil
}
}

return "", fmt.Errorf("key range not found")
}

// TODO : unit tests
func (c *CoordinatorService) SplitKeyRange(ctx context.Context, request *protos.SplitKeyRangeRequest) (*protos.ModifyReply, error) {
splitKR := &kr.SplitKeyRange{
Expand Down
Loading

0 comments on commit abdd6aa

Please sign in to comment.