Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forbid default distribution & coordinator fixes #490

Merged
merged 45 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
4f39682
Changed GetRelationDistribution in MemQDB
EinKrebs Feb 7, 2024
a02b197
Changed unit tests for memQDB
EinKrebs Feb 7, 2024
402258d
Restored a unit test for memQDB
EinKrebs Feb 7, 2024
212e3e2
Changed regression tests execution
EinKrebs Feb 7, 2024
736c9a2
Fix router/begin regression test
EinKrebs Feb 7, 2024
37b1d8d
Require distribution in sharding rule/key range definition
EinKrebs Feb 7, 2024
d711ba1
FIXUP_INTO begin test fix
EinKrebs Feb 7, 2024
312d7b0
FIXUP_INTO begin test fix
EinKrebs Feb 7, 2024
2259bbf
Fixed copy_routing regress test
EinKrebs Feb 7, 2024
c052e88
FIXUP_INTO regression test exec
EinKrebs Feb 7, 2024
6527425
Fixed router/error regression test
EinKrebs Feb 7, 2024
a0ceb91
Fixed router/joins regression test
EinKrebs Feb 7, 2024
41c33c5
Fixed router/mixed_routing regression test
EinKrebs Feb 7, 2024
aec80cc
Fixed router/multishard regression test
EinKrebs Feb 7, 2024
ed28a0b
Changed & disabled router/routing_hint regression test
EinKrebs Feb 7, 2024
f26ae6d
Removed absent test from schedule
EinKrebs Feb 7, 2024
505b8a4
Fixed router/shard_routing regression test
EinKrebs Feb 7, 2024
0f98551
Fixed router/single_shard_joins regression test
EinKrebs Feb 7, 2024
bd209fe
Fixed router/target_session_attrs regression test
EinKrebs Feb 7, 2024
0e12602
Fixed router/transactions regression test
EinKrebs Feb 7, 2024
63a2c4d
Fixed router/with_tables regression test
EinKrebs Feb 7, 2024
b82b2f2
Fixed coordinator test execution
EinKrebs Feb 7, 2024
03baf7e
Fixed console regression tests
EinKrebs Feb 7, 2024
72e732e
ListDistribution -> ListDistributions
EinKrebs Feb 8, 2024
f631098
Implemented missing methods in coordinator.go
EinKrebs Feb 8, 2024
fce8725
FIXUP_INTO etcdqdb changes
EinKrebs Feb 8, 2024
443e98e
Added DistributionId to sharding rule proto
EinKrebs Feb 8, 2024
b1ea093
Fixed CreateDistribution in coordinator
EinKrebs Feb 9, 2024
7365732
Fix key range union in qdbCoordinator
EinKrebs Feb 9, 2024
56039c7
Fixed default docker-compose.yaml
EinKrebs Feb 12, 2024
843dde7
Fixed Split/Unite methods in QdbCoordinator
EinKrebs Feb 12, 2024
d5f884d
Fixed coordinator feature test
EinKrebs Feb 12, 2024
0b2e67a
Fixed init.sql in feature tests
EinKrebs Feb 12, 2024
afdc733
FIXUP_INTO some coordinator-related shit
EinKrebs Feb 12, 2024
943bc03
More obvious "*-admin host setup in feature tests"
EinKrebs Feb 12, 2024
6abf912
Fix move feature test
EinKrebs Feb 12, 2024
adf48ec
Fix move_recover feature test
EinKrebs Feb 12, 2024
efb40fb
Fix other feature tests
EinKrebs Feb 12, 2024
12acbaa
Fixed linter
EinKrebs Feb 13, 2024
b8ea4e3
Fixed unit tests
EinKrebs Feb 13, 2024
78e38b1
Fixed console regression tests expected output
EinKrebs Feb 13, 2024
9ef7e59
more refactoring in Makefile
EinKrebs Feb 13, 2024
926c69f
Removed setting default distribution in meta.ProcessDrop
EinKrebs Feb 13, 2024
78459bc
Fixed stress test
EinKrebs Feb 13, 2024
3309dc6
Fixes for proto tests
EinKrebs Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ regress_local: proxy_2sh_run
./script/regress_local.sh

regress: build_images
docker compose -f test/regress/docker-compose.yaml up --remove-orphans --force-recreate --exit-code-from regress --build coordinator router shard1 shard2 regress qdb01
docker compose -f test/regress/docker-compose.yaml down && docker compose -f test/regress/docker-compose.yaml run --build --remove-orphans regress
EinKrebs marked this conversation as resolved.
Show resolved Hide resolved

hibernate_regress: build_images
docker compose -f test/drivers/hibernate-regress/docker-compose.yaml up --remove-orphans --force-recreate --exit-code-from regress --build coordinator router shard1 shard2 regress qdb01
Expand All @@ -98,10 +98,10 @@ jdbc_regress: build_images
docker compose -f test/drivers/jdbc-regress/docker-compose.yaml up --remove-orphans --force-recreate --exit-code-from regress --build coordinator router shard1 shard2 regress qdb01

gorm_regress: build_images
docker compose -f test/drivers/gorm-regress/docker-compose.yaml up --remove-orphans --force-recreate --exit-code-from regress --build coordinator router shard1 shard2 regress qdb01
docker compose -f test/drivers/gorm-regress/docker-compose.yaml down && docker compose -f test/drivers/gorm-regress/docker-compose.yaml run --remove-orphans --build regress

xproto_regress: build_images
docker compose -f test/xproto/docker-compose.yaml up --remove-orphans --force-recreate --exit-code-from regress --build coordinator router shard1 shard2 regress qdb01
docker compose -f test/xproto/docker-compose.yaml down && docker compose -f test/xproto/docker-compose.yaml run --remove-orphans --build regress

e2e: build_images
docker compose up --remove-orphans --exit-code-from client --build router coordinator shard1 shard2 qdb01 client
Expand Down
221 changes: 200 additions & 21 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"github.com/pg-sharding/spqr/pkg/models/distributions"
"net"
"time"

Expand Down Expand Up @@ -195,8 +196,15 @@ type CoordinatorClient interface {

type qdbCoordinator struct {
tlsconfig *tls.Config
coordinator.Coordinator
db qdb.XQDB
db qdb.XQDB
}

func (qc *qdbCoordinator) ShareKeyRange(id string) error {
return qc.db.ShareKeyRange(id)
}

func (qc *qdbCoordinator) QDB() qdb.QDB {
return qc.db
}

var _ coordinator.Coordinator = &qdbCoordinator{}
Expand Down Expand Up @@ -281,7 +289,7 @@ func NewCoordinator(tlsconfig *tls.Config, db qdb.XQDB) *qdbCoordinator {
}

// TODO : unit tests
func (cc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter bool) bool {
func (qc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter bool) bool {
registerRouter := func() bool {
if !initialRouter {
return true
Expand All @@ -291,25 +299,25 @@ func (cc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter boo
Address: fmt.Sprintf("%s:%s", config.RouterConfig().Host, config.RouterConfig().GrpcApiPort),
State: qdb.OPENED,
}
if err := cc.RegisterRouter(ctx, router); err != nil {
if err := qc.RegisterRouter(ctx, router); err != nil {
spqrlog.Zero.Error().Err(err).Msg("register router when locking coordinator")
}
if err := cc.SyncRouterMetadata(ctx, router); err != nil {
if err := qc.SyncRouterMetadata(ctx, router); err != nil {
spqrlog.Zero.Error().Err(err).Msg("sync router metadata when locking coordinator")
}
if err := cc.UpdateCoordinator(ctx, config.CoordinatorConfig().Host); err != nil {
if err := qc.UpdateCoordinator(ctx, config.CoordinatorConfig().Host); err != nil {
return false
}
return true
}

if cc.db.TryCoordinatorLock(context.TODO()) != nil {
if qc.db.TryCoordinatorLock(context.TODO()) != nil {
for {
select {
case <-ctx.Done():
return false
case <-time.After(time.Second):
if err := cc.db.TryCoordinatorLock(context.TODO()); err == nil {
if err := qc.db.TryCoordinatorLock(context.TODO()); err == nil {
return registerRouter()
} else {
spqrlog.Zero.Error().Err(err).Msg("qdb already taken, waiting for connection")
Expand All @@ -324,20 +332,20 @@ func (cc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter boo
// TODO : unit tests
// RunCoordinator side effect: it runs an asynchronous goroutine
// that checks the availability of the SPQR router
func (cc *qdbCoordinator) RunCoordinator(ctx context.Context, initialRouter bool) {
if !cc.lockCoordinator(ctx, initialRouter) {
func (qc *qdbCoordinator) RunCoordinator(ctx context.Context, initialRouter bool) {
if !qc.lockCoordinator(ctx, initialRouter) {
return
}

ranges, err := cc.db.ListAllKeyRanges(context.TODO())
ranges, err := qc.db.ListAllKeyRanges(context.TODO())
if err != nil {
spqrlog.Zero.Error().
Err(err).
Msg("faild to list key ranges")
}

for _, r := range ranges {
tx, err := cc.db.GetTransferTx(context.TODO(), r.KeyRangeID)
tx, err := qc.db.GetTransferTx(context.TODO(), r.KeyRangeID)
if err != nil {
continue
}
Expand All @@ -347,7 +355,7 @@ func (cc *qdbCoordinator) RunCoordinator(ctx context.Context, initialRouter bool
ShardId: tx.ToShardId,
Krid: r.KeyRangeID,
}
err = cc.Move(context.TODO(), &tem)
err = qc.Move(context.TODO(), &tem)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to move key range")
}
Expand All @@ -356,13 +364,13 @@ func (cc *qdbCoordinator) RunCoordinator(ctx context.Context, initialRouter bool
datatransfers.ResolvePreparedTransaction(context.TODO(), tx.FromShardId, tx.FromTxName, false)
}

err = cc.db.RemoveTransferTx(context.TODO(), r.KeyRangeID)
err = qc.db.RemoveTransferTx(context.TODO(), r.KeyRangeID)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("error removing from qdb")
}
}

go cc.watchRouters(context.TODO())
go qc.watchRouters(context.TODO())
}

// TODO : unit tests
Expand Down Expand Up @@ -683,13 +691,28 @@ func (qc *qdbCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) erro
}
}()

if kr.CmpRangesEqual(req.Bound, krOld.LowerBound) {
ds, err := qc.db.GetDistribution(ctx, krOld.DistributionId)
if err != nil {
return err
}

if kr.CmpRangesEqual(krOld.LowerBound, req.Bound) {
return spqrerror.New(spqrerror.SPQR_KEYRANGE_ERROR, "failed to split because bound equals lower of the key range")
}
if kr.CmpRangesLess(req.Bound, krOld.LowerBound) {
return spqrerror.New(spqrerror.SPQR_KEYRANGE_ERROR, "failed to split because bound is out of key range")
}

krs, err := qc.db.ListKeyRanges(ctx, ds.ID)
if err != nil {
return err
}
for _, kRange := range krs {
if kr.CmpRangesLess(krOld.LowerBound, kRange.LowerBound) && kr.CmpRangesLessEqual(kRange.LowerBound, req.Bound) {
return spqrerror.Newf(spqrerror.SPQR_KEYRANGE_ERROR, "failed to split because bound intersects with \"%s\" key range", kRange.KeyRangeID)
}
}

krNew := kr.KeyRangeFromDB(
&qdb.KeyRange{
LowerBound: req.Bound,
Expand Down Expand Up @@ -824,14 +847,30 @@ func (qc *qdbCoordinator) Unite(ctx context.Context, uniteKeyRange *kr.UniteKeyR
if krLeft.ShardID != krRight.ShardID {
return spqrerror.New(spqrerror.SPQR_KEYRANGE_ERROR, "failed to unite key ranges routing different shards")
}
if !kr.CmpRangesEqual(krLeft.UpperBound, krRight.LowerBound) {
if !kr.CmpRangesEqual(krLeft.LowerBound, krRight.UpperBound) {
return spqrerror.New(spqrerror.SPQR_KEYRANGE_ERROR, "failed to unite non-adjacent key ranges")
}
if krLeft.DistributionId != krRight.DistributionId {
return spqrerror.New(spqrerror.SPQR_KEYRANGE_ERROR, "failed to unite key ranges of different distributions")
}
ds, err := qc.db.GetDistribution(ctx, krLeft.DistributionId)
if err != nil {
return err
}
// TODO: check all types when composite keys are supported
if kr.CmpRangesLess(krRight.LowerBound, krLeft.LowerBound) {
krLeft, krRight = krRight, krLeft
}

krLeft.UpperBound = krRight.UpperBound
krs, err := qc.db.ListKeyRanges(ctx, ds.ID)
if err != nil {
return err
}
for _, kRange := range krs {
if kRange.KeyRangeID != krLeft.KeyRangeID &&
kRange.KeyRangeID != krRight.KeyRangeID &&
kr.CmpRangesLessEqual(krLeft.LowerBound, kRange.LowerBound) &&
kr.CmpRangesLessEqual(kRange.LowerBound, krRight.LowerBound) {
return spqrerror.New(spqrerror.SPQR_KEYRANGE_ERROR, "failed to unite non-adjacent key ranges")
}
}

if err := qc.db.DropKeyRange(ctx, krRight.KeyRangeID); err != nil {
return spqrerror.Newf(spqrerror.SPQR_KEYRANGE_ERROR, "failed to drop an old key range: %s", err.Error())
Expand Down Expand Up @@ -1223,6 +1262,146 @@ func (qc *qdbCoordinator) GetCoordinator(ctx context.Context) (string, error) {
return addr, err
}

// ListDistributions returns all distributions from QDB
// TODO: unit tests
func (qc *qdbCoordinator) ListDistributions(ctx context.Context) ([]*distributions.Distribution, error) {
distrs, err := qc.db.ListDistributions(ctx)
if err != nil {
return nil, err
}
res := make([]*distributions.Distribution, 0)
for _, ds := range distrs {
res = append(res, distributions.DistributionFromDB(ds))
}
return res, nil
}

// CreateDistribution creates distribution in QDB
// TODO: unit tests
func (qc *qdbCoordinator) CreateDistribution(ctx context.Context, ds *distributions.Distribution) error {
if err := qc.db.CreateDistribution(ctx, distributions.DistributionToDB(ds)); err != nil {
return err
}

return qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewDistributionServiceClient(cc)
resp, err := cl.CreateDistribution(context.TODO(), &routerproto.CreateDistributionRequest{
Distributions: []*routerproto.Distribution{distributions.DistributionToProto(ds)},
})
if err != nil {
return err
}

spqrlog.Zero.Debug().
Interface("response", resp).
Msg("create distribution response")
return nil
})
}

// DropDistribution deletes distribution from QDB
// TODO: unit tests
func (qc *qdbCoordinator) DropDistribution(ctx context.Context, id string) error {
if err := qc.db.DropDistribution(ctx, id); err != nil {
return err
}

return qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewDistributionServiceClient(cc)
resp, err := cl.DropDistribution(context.TODO(), &routerproto.DropDistributionRequest{
Ids: []string{id},
})
if err != nil {
return err
}

spqrlog.Zero.Debug().
Interface("response", resp).
Msg("drop distribution response")
return nil
})
}

// GetDistribution retrieves info about distribution from QDB
// TODO: unit tests
func (qc *qdbCoordinator) GetDistribution(ctx context.Context, id string) (*distributions.Distribution, error) {
ds, err := qc.db.GetDistribution(ctx, id)
if err != nil {
return nil, err
}
return distributions.DistributionFromDB(ds), nil
}

// GetRelationDistribution retrieves info about distribution attached to relation from QDB
// TODO: unit tests
func (qc *qdbCoordinator) GetRelationDistribution(ctx context.Context, relName string) (*distributions.Distribution, error) {
ds, err := qc.db.GetRelationDistribution(ctx, relName)
if err != nil {
return nil, err
}
return distributions.DistributionFromDB(ds), nil
}

// AlterDistributionAttach attaches relation to distribution
// TODO: unit tests
func (qc *qdbCoordinator) AlterDistributionAttach(ctx context.Context, id string, rels []*distributions.DistributedRelation) error {
if err := qc.db.AlterDistributionAttach(ctx, id, func() []*qdb.DistributedRelation {
qdbRels := make([]*qdb.DistributedRelation, len(rels))
for i, rel := range rels {
qdbRels[i] = distributions.DistributedRelationToDB(rel)
}
return qdbRels
}()); err != nil {
return err
}

return qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewDistributionServiceClient(cc)
resp, err := cl.AlterDistributionAttach(context.TODO(), &routerproto.AlterDistributionAttachRequest{
Id: id,
Relations: func() []*routerproto.DistributedRelation {
res := make([]*routerproto.DistributedRelation, len(rels))
for i, rel := range rels {
res[i] = distributions.DistributedRelatitonToProto(rel)
}
return res
}(),
})
if err != nil {
return err
}

spqrlog.Zero.Debug().
Interface("response", resp).
Msg("attach relation response")
return nil
})
}

// AlterDistributionDetach detaches relation from distribution
// TODO: unit tests
func (qc *qdbCoordinator) AlterDistributionDetach(ctx context.Context, id string, relName string) error {
if err := qc.db.AlterDistributionDetach(ctx, id, relName); err != nil {
return err
}

return qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewDistributionServiceClient(cc)
resp, err := cl.AlterDistributionDetach(context.TODO(), &routerproto.AlterDistributionDetachRequest{
Id: id,
RelNames: []string{relName},
})
if err != nil {
return err
}

spqrlog.Zero.Debug().
Interface("response", resp).
Msg("detach relation response")
return nil
})
}

func (qc *qdbCoordinator) GetShardInfo(ctx context.Context, shardID string) (*datashards.DataShard, error) {
panic("implement or delete me")
}
4 changes: 3 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ services:
- "7003:7003"
hostname: spqr_coordinator
container_name: spqr_coordinator
depends_on:
- qdb01
qdb01:
image: 'bitnami/etcd:latest'
container_name: spqr_qdb_0_1
Expand All @@ -53,7 +55,7 @@ services:
context: .
ports:
- "8432:6432"
- "7013:7003"
- "7013:7000"
hostname: spqr_router_1_1
container_name: spqr_router_1_1
environment:
Expand Down
4 changes: 2 additions & 2 deletions pkg/coord/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,10 @@ func (a *Adapter) GetShardInfo(ctx context.Context, shardID string) (*datashards
}

// TODO : unit tests
func (a *Adapter) ListDistribution(ctx context.Context) ([]*distributions.Distribution, error) {
func (a *Adapter) ListDistributions(ctx context.Context) ([]*distributions.Distribution, error) {
c := proto.NewDistributionServiceClient(a.conn)

resp, err := c.ListDistribution(ctx, &proto.ListDistributionRequest{})
resp, err := c.ListDistributions(ctx, &proto.ListDistributionsRequest{})
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/coord/local/clocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type LocalCoordinator struct {
}

// TODO : unit tests
func (lc *LocalCoordinator) ListDistribution(ctx context.Context) ([]*distributions.Distribution, error) {
func (lc *LocalCoordinator) ListDistributions(ctx context.Context) ([]*distributions.Distribution, error) {
lc.mu.Lock()
defer lc.mu.Unlock()

Expand Down
Loading
Loading