Skip to content

Commit

Permalink
Dataspace v3 (#322)
Browse files Browse the repository at this point in the history
* Rebase of some work

* Add Delete Dataspace Support

* fix

* gen

* correction

* Add validation to add kr and shr

* Correction

* Fix lint test

* refactor set command

* refactor set command

* Fix

* fix regress

* fir feature

* fix feature

* fix feature

* fix feature

* fix feature

* fix feature

* fix feature

* fix feature

* fix feature

* change proto

* gen

* fix feature

* fix feature

* fix feature

* fix feature

* Refactor

* Refactor

* Refactor

* Refactor

* Refactor

* Refactor

* feature fix

* feature fix

* feature fix

* change HARD flag

* gen

* fix regress

* fix feture

* to gen

* gen

* refactor

* refactor

* Add new test to delete dataspace

* To gen

* gen

* implement dataspace methonds in adapter.go

* refactor set command

* Change error message

* fix regress

* fix tests

---------

Co-authored-by: root <root@spqr-test.ru-central1.internal>
  • Loading branch information
CBists and root committed Nov 29, 2023
1 parent ce8bfaf commit 0124e07
Show file tree
Hide file tree
Showing 104 changed files with 2,897 additions and 1,167 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ y.output
memqdb.json
test/feature/generatedFeatures
test/feature/logs
yacc/console/gram.y.save
.DS_Store
6 changes: 6 additions & 0 deletions balancer/pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ func (c BalancerClient) DB() string {
return "DefaultDB"
}

func (c BalancerClient) DS() string {
return "default"
}

func (c BalancerClient) SetDS(_ string) {}

func (c BalancerClient) ID() string {
return "balancerID"
}
Expand Down
100 changes: 63 additions & 37 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (cc *qdbCoordinator) RunCoordinator(ctx context.Context, initialRouter bool
return
}

ranges, err := cc.db.ListKeyRanges(context.TODO())
ranges, err := cc.db.ListAllKeyRanges(context.TODO())
if err != nil {
spqrlog.Zero.Error().
Err(err).
Expand Down Expand Up @@ -373,12 +373,12 @@ func (qc *qdbCoordinator) traverseRouters(ctx context.Context, cb func(cc *grpc.
}

if err := cb(cc); err != nil {
return err
spqrlog.Zero.Debug().Err(err).Str("router id", rtr.ID).Msg("traverse routers")
}

return nil
}(); err != nil {
spqrlog.Zero.Debug().Err(err).Str("router id", rtr.ID).Msg("traverse routers")
return err
}
}

Expand Down Expand Up @@ -407,8 +407,38 @@ func (qc *qdbCoordinator) AddRouter(ctx context.Context, router *topology.Router
return qc.db.AddRouter(ctx, topology.RouterToDB(router))
}

func (qc *qdbCoordinator) ListShardingRules(ctx context.Context) ([]*shrule.ShardingRule, error) {
rulesList, err := qc.db.ListShardingRules(ctx)
func (qc *qdbCoordinator) getAllListShardingRules(ctx context.Context) ([]*shrule.ShardingRule, error) {
rulesList, err := qc.db.ListAllShardingRules(ctx)
if err != nil {
return nil, err
}

shRules := make([]*shrule.ShardingRule, 0, len(rulesList))
for _, rule := range rulesList {
shRules = append(shRules, shrule.ShardingRuleFromDB(rule))
}

return shRules, nil
}

func (qc *qdbCoordinator) ListShardingRules(ctx context.Context, dataspace string) ([]*shrule.ShardingRule, error) {
rulesList, err := qc.db.ListShardingRules(ctx, dataspace)
if err != nil {
return nil, err
}

shRules := make([]*shrule.ShardingRule, 0, len(rulesList))
for _, rule := range rulesList {
if rule.DataspaceId == dataspace {
shRules = append(shRules, shrule.ShardingRuleFromDB(rule))
}
}

return shRules, nil
}

func (qc *qdbCoordinator) ListAllShardingRules(ctx context.Context) ([]*shrule.ShardingRule, error) {
rulesList, err := qc.db.ListAllShardingRules(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -505,44 +535,38 @@ func (qc *qdbCoordinator) AddKeyRange(ctx context.Context, keyRange *kr.KeyRange
return err
}

resp, err := qc.db.ListRouters(ctx)
if err != nil {
return err
}

// notify all routers
for _, r := range resp {
cc, err := DialRouter(&topology.Router{
ID: r.ID,
Address: r.Addr(),
})
if err != nil {
return err
}

return qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewKeyRangeServiceClient(cc)
resp, err := cl.AddKeyRange(ctx, &routerproto.AddKeyRangeRequest{
KeyRangeInfo: keyRange.ToProto(),
})

if err != nil {
spqrlog.Zero.Debug().
Str("router", r.ID).
Err(err).
Msg("etcdqdb: notify router add key range")
continue
return err
}

spqrlog.Zero.Debug().
Interface("response", resp).
Msg("add key range response")
return nil
})
}

func (qc *qdbCoordinator) ListKeyRanges(ctx context.Context, dataspace string) ([]*kr.KeyRange, error) {
keyRanges, err := qc.db.ListKeyRanges(ctx, dataspace)
if err != nil {
return nil, err
}

return nil
}
keyr := make([]*kr.KeyRange, 0, len(keyRanges))
for _, keyRange := range keyRanges {
keyr = append(keyr, kr.KeyRangeFromDB(keyRange))
}

func (qc *qdbCoordinator) ListKeyRanges(ctx context.Context) ([]*kr.KeyRange, error) {
keyRanges, err := qc.db.ListKeyRanges(ctx)
return keyr, nil
}
func (qc *qdbCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRange, error) {
keyRanges, err := qc.db.ListAllKeyRanges(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -635,10 +659,11 @@ func (qc *qdbCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) erro

krNew := kr.KeyRangeFromDB(
&qdb.KeyRange{
LowerBound: req.Bound,
UpperBound: krOld.UpperBound,
KeyRangeID: req.Krid,
ShardID: krOld.ShardID,
LowerBound: req.Bound,
UpperBound: krOld.UpperBound,
KeyRangeID: req.Krid,
ShardID: krOld.ShardID,
DataspaceId: krOld.DataspaceId,
},
)

Expand Down Expand Up @@ -783,7 +808,8 @@ func (qc *qdbCoordinator) Unite(ctx context.Context, uniteKeyRange *kr.UniteKeyR
if err := qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewKeyRangeServiceClient(cc)
resp, err := cl.MergeKeyRange(ctx, &routerproto.MergeKeyRangeRequest{
Bound: krRight.LowerBound,
Bound: krRight.LowerBound,
Dataspace: krRight.DataspaceId,
})

spqrlog.Zero.Debug().
Expand Down Expand Up @@ -835,7 +861,7 @@ func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error
if err != nil {
return err
}
shardingRules, err := qc.ListShardingRules(ctx)
shardingRules, err := qc.getAllListShardingRules(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -916,7 +942,7 @@ func (qc *qdbCoordinator) SyncRouterMetadata(ctx context.Context, qRouter *topol
defer cc.Close()

// Configure sharding rules.
shardingRules, err := qc.db.ListShardingRules(ctx)
shardingRules, err := qc.db.ListAllShardingRules(ctx)
if err != nil {
return err
}
Expand All @@ -942,7 +968,7 @@ func (qc *qdbCoordinator) SyncRouterMetadata(ctx context.Context, qRouter *topol
Msg("add sharding rules response")

// Configure key ranges.
keyRanges, err := qc.db.ListKeyRanges(ctx)
keyRanges, err := qc.db.ListAllKeyRanges(ctx)
if err != nil {
return err
}
Expand Down
14 changes: 6 additions & 8 deletions coordinator/provider/keyranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (c *CoordinatorService) AddKeyRange(ctx context.Context, request *protos.Ad
UpperBound: []byte(request.KeyRangeInfo.KeyRange.UpperBound),
ID: request.KeyRangeInfo.Krid,
ShardID: request.KeyRangeInfo.ShardId,
Dataspace: "default",
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -51,8 +52,8 @@ func (c *CoordinatorService) UnlockKeyRange(ctx context.Context, request *protos
return &protos.ModifyReply{}, nil
}

func (c *CoordinatorService) KeyRangeIDByBounds(ctx context.Context, keyRange *protos.KeyRange) (string, error) {
krsqb, err := c.impl.ListKeyRanges(ctx)
func (c *CoordinatorService) KeyRangeIDByBounds(ctx context.Context, keyRange *protos.KeyRange, dataspace string) (string, error) {
krsqb, err := c.impl.ListKeyRanges(ctx, dataspace)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -82,12 +83,9 @@ func (c *CoordinatorService) SplitKeyRange(ctx context.Context, request *protos.
return &protos.ModifyReply{}, nil
}

func (c *CoordinatorService) ListKeyRange(ctx context.Context, _ *protos.ListKeyRangeRequest) (*protos.KeyRangeReply, error) {
if c.impl == nil {
return &protos.KeyRangeReply{}, nil
}
func (c *CoordinatorService) ListKeyRange(ctx context.Context, request *protos.ListKeyRangeRequest) (*protos.KeyRangeReply, error) {

krsqb, err := c.impl.ListKeyRanges(ctx)
krsqb, err := c.impl.ListAllKeyRanges(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -115,7 +113,7 @@ func (c *CoordinatorService) MoveKeyRange(ctx context.Context, request *protos.M
}

func (c *CoordinatorService) MergeKeyRange(ctx context.Context, request *protos.MergeKeyRangeRequest) (*protos.ModifyReply, error) {
krsqb, err := c.impl.ListKeyRanges(ctx)
krsqb, err := c.impl.ListAllKeyRanges(ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion coordinator/provider/sharding_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (s *ShardingRulesService) AddShardingRules(ctx context.Context, request *pr
}

func (s *ShardingRulesService) ListShardingRules(ctx context.Context, request *protos.ListShardingRuleRequest) (*protos.ListShardingRuleReply, error) {
rules, err := s.impl.ListShardingRules(ctx)
rules, err := s.impl.ListAllShardingRules(ctx)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ require (
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.5.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
Expand Down
1 change: 1 addition & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Client interface {

Usr() string
DB() string
DS() string

Send(msg pgproto3.BackendMessage) error
SendCtx(ctx context.Context, msg pgproto3.BackendMessage) error
Expand Down
65 changes: 53 additions & 12 deletions pkg/clientinteractor/interactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ func (pi *PSQLInteractor) CompleteMsg(rowCnt int) error {
return nil
}

func (pi *PSQLInteractor) GetDataspace() string {
return pi.cl.DS()
}

func (pi *PSQLInteractor) SetDataspace(dataspace string) {
pi.cl.SetParam("dataspace", dataspace)
}

// TEXTOID https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat#L81
const TEXTOID = 25

Expand Down Expand Up @@ -185,6 +193,7 @@ func (pi *PSQLInteractor) KeyRanges(krs []*kr.KeyRange) error {
&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
TextOidFD("Key range ID"),
TextOidFD("Shard ID"),
TextOidFD("Dataspace ID"),
TextOidFD("Lower bound"),
TextOidFD("Upper bound"),
},
Expand All @@ -201,6 +210,7 @@ func (pi *PSQLInteractor) KeyRanges(krs []*kr.KeyRange) error {
Values: [][]byte{
[]byte(keyRange.ID),
[]byte(keyRange.ShardID),
[]byte(keyRange.Dataspace),
keyRange.LowerBound,
keyRange.UpperBound,
},
Expand All @@ -209,18 +219,7 @@ func (pi *PSQLInteractor) KeyRanges(krs []*kr.KeyRange) error {
return err
}
}

for _, msg := range []pgproto3.BackendMessage{
&pgproto3.CommandComplete{},
&pgproto3.ReadyForQuery{},
} {
if err := pi.cl.Send(msg); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
}

return nil
return pi.CompleteMsg(0)
}

func (pi *PSQLInteractor) AddKeyRange(ctx context.Context, keyRange *kr.KeyRange) error {
Expand Down Expand Up @@ -473,6 +472,7 @@ func (pi *PSQLInteractor) ShardingRules(ctx context.Context, rules []*shrule.Sha
for _, msg := range []pgproto3.BackendMessage{
&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
TextOidFD("Sharding Rule ID"),
TextOidFD("Dataspace ID"),
TextOidFD("Table Name"),
TextOidFD("Columns"),
TextOidFD("Hash Function"),
Expand Down Expand Up @@ -504,6 +504,7 @@ func (pi *PSQLInteractor) ShardingRules(ctx context.Context, rules []*shrule.Sha
if err := pi.cl.Send(&pgproto3.DataRow{
Values: [][]byte{
[]byte(rule.Id),
[]byte(rule.Dataspace),
[]byte(tableName),
[]byte(entries.String()),
[]byte(hashFunctions.String()),
Expand All @@ -517,6 +518,30 @@ func (pi *PSQLInteractor) ShardingRules(ctx context.Context, rules []*shrule.Sha
return pi.CompleteMsg(0)
}

func (pi *PSQLInteractor) Dataspaces(ctx context.Context, dataspaces []*dataspaces.Dataspace) error {
for _, msg := range []pgproto3.BackendMessage{
&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
TextOidFD("Dataspace ID"),
}},
} {
if err := pi.cl.Send(msg); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
}
for _, dataspace := range dataspaces {
if err := pi.cl.Send(&pgproto3.DataRow{
Values: [][]byte{
[]byte(dataspace.Id),
},
}); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
}
return pi.CompleteMsg(0)
}

func (pi *PSQLInteractor) ReportError(err error) error {
if err == nil {
return nil
Expand Down Expand Up @@ -717,6 +742,22 @@ func (pi *PSQLInteractor) AddDataspace(ctx context.Context, ks *dataspaces.Datas
return pi.CompleteMsg(0)
}

func (pi *PSQLInteractor) DropDataspace(ctx context.Context, ids []string) error {
if err := pi.WriteHeader("drop dataspace"); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}

for _, id := range ids {
if err := pi.WriteDataRow(fmt.Sprintf("drop dataspace %s", id)); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
}

return pi.CompleteMsg(0)
}

func (pi *PSQLInteractor) ReportStmtRoutedToAllShards(ctx context.Context) error {
if err := pi.WriteHeader("explain query"); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
Expand Down
Loading

0 comments on commit 0124e07

Please sign in to comment.