Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data transfer #235

Merged
merged 19 commits into from
Jul 26, 2023
32 changes: 13 additions & 19 deletions cmd/mover/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func (p *ProxyW) Write(bt []byte) (int, error) {
spqrlog.Zero.Debug().
Bytes("bytes", bt).
Msg("got bytes")

return p.w.Write(bt)
}

Expand Down Expand Up @@ -69,11 +68,10 @@ func moveData(ctx context.Context, from, to *pgx.Conn, keyRange kr.KeyRange, key
SELECT table_schema, table_name
FROM information_schema.columns
WHERE column_name=$1;
`, key.Entries()[0])
`, key.Entries()[0].Column)
if err != nil {
return err
}

var ress []MoveTableRes

for rows.Next() {
Expand Down Expand Up @@ -103,21 +101,8 @@ WHERE column_name=$1;
w: w,
}

ch := make(chan struct{})

go func() {
spqrlog.Zero.Debug().Msg("sending rows to dest shard")
_, err := txTo.Conn().PgConn().CopyFrom(ctx,
r, fmt.Sprintf("COPY %s.%s FROM STDIN", v.TableSchema, v.TableName))
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("copy in failed")
}

ch <- struct{}{}
}()

qry := fmt.Sprintf("copy (delete from %s.%s WHERE %s >= %s and %s <= %s returning *) to stdout", v.TableSchema, v.TableName,
key.Entries()[0], keyRange.LowerBound, key.Entries()[0], keyRange.UpperBound)
key.Entries()[0].Column, keyRange.LowerBound, key.Entries()[0].Column, keyRange.UpperBound)

spqrlog.Zero.Debug().
Str("query", qry).
Expand All @@ -132,9 +117,14 @@ WHERE column_name=$1;
spqrlog.Zero.Error().Err(err).Msg("error closing pipe")
}

spqrlog.Zero.Debug().Msg("copy cmd executed")
_, err = txTo.Conn().PgConn().CopyFrom(ctx,
r, fmt.Sprintf("COPY %s.%s FROM STDIN", v.TableSchema, v.TableName))
if err != nil {
spqrlog.Zero.Debug().Msg("copy in failed")
return err
}

<-ch
spqrlog.Zero.Debug().Msg("copy cmd executed")
}

_ = txTo.Commit(ctx)
Expand Down Expand Up @@ -165,6 +155,10 @@ func main() {
return
}

//entrys := []shrule.ShardingRuleEntry{*shrule.NewShardingRuleEntry("id", "nohash")}
//my_rule := shrule.NewShardingRule("r1", "fast", entrys)
//db.AddShardingRule(context.TODO(), shrule.ShardingRuleToDB(my_rule))

shRule, err := db.GetShardingRule(context.TODO(), *shkey)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
Expand Down
45 changes: 43 additions & 2 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"time"

"github.com/pg-sharding/spqr/pkg/datatransfers"
"github.com/pg-sharding/spqr/pkg/meta"
"github.com/pg-sharding/spqr/pkg/models/topology"
"github.com/pg-sharding/spqr/pkg/shard"
Expand Down Expand Up @@ -227,6 +228,38 @@ func NewCoordinator(db qdb.QDB) *qdbCoordinator {
db: db,
}

ranges, err := db.ListKeyRanges(context.TODO())
if err != nil {
spqrlog.Zero.Error().
Err(err).
Msg("faild to list key ranges")
}

for _, r := range ranges {
tx, err := db.GetTransferTx(context.TODO(), r.KeyRangeID)
if tx == nil || err != nil {
continue
}
if tx.ToStatus == "commit" {
datatransfers.ResolvePreparedTransaction(context.TODO(), tx.FromShardId, tx.FromTxName, true)
tem := kr.MoveKeyRange{
ShardId: tx.ToShardId,
Krid: r.KeyRangeID,
}
err = cc.Move(context.TODO(), &tem)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to move key range")
}
} else {
datatransfers.ResolvePreparedTransaction(context.TODO(), tx.ToShardId, tx.ToTxName, false)
datatransfers.ResolvePreparedTransaction(context.TODO(), tx.FromShardId, tx.FromTxName, false)
}
err = db.RemoveTransferTx(context.TODO(), r.KeyRangeID)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("error removing from qdb")
}
}

go cc.watchRouters(context.TODO())
return cc
}
Expand Down Expand Up @@ -593,6 +626,7 @@ func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error
Str("shard-id", req.ShardId).
Msg("qdb coordinator move key range")

//move i qdb
if err := qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewKeyRangeServiceClient(cc)
lockResp, err := cl.LockKeyRange(ctx, &routerproto.LockKeyRangeRequest{
Expand Down Expand Up @@ -634,6 +668,15 @@ func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error
}
}()

//move between shards
keyRange, _ := qc.db.GetKeyRange(ctx, req.Krid)
shardingRules, _ := qc.ListShardingRules(ctx)
err = datatransfers.MoveKeys(ctx, keyRange.ShardID, req.ShardId, *keyRange, shardingRules, &qc.db)
if err != nil {
spqrlog.Zero.Error().Msg("failed to move rows")
return err
}

krmv.ShardID = req.ShardId
if err := ops.ModifyKeyRangeWithChecks(ctx, qc.db, kr.KeyRangeFromDB(krmv)); err != nil {
// TODO: check if unlock here is ok
Expand Down Expand Up @@ -663,7 +706,6 @@ func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error
Interface("response", unlockResp).
Msg("unlock key range response")
}()

moveResp, err := cl.MoveKeyRange(ctx, &routerproto.MoveKeyRangeRequest{
KeyRange: kr.KeyRangeFromDB(krmv).ToProto(),
})
Expand All @@ -674,7 +716,6 @@ func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error
}); err != nil {
return err
}
// do phyc keys move
return nil
}

Expand Down
1 change: 1 addition & 0 deletions docker/coordinator/cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ addr: '[spqr_coordinator]:7002'
http_addr: 'spqr_coordinator:7003'
qdb_addr: '[spqr_qdb_0_1]:2379'
log_level: INFO
shard_data: '/spqr/docker/coordinator/shard_data.yaml'
13 changes: 13 additions & 0 deletions docker/coordinator/shard_data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
shards:
sh1:
db: db1
user: user1
Password: 12345678
host: 'spqr_shard_1'
port: '6432'
sh2:
db: db1
user: user1
Password: 12345678
host: 'spqr_shard_2'
port: '6432'
1 change: 1 addition & 0 deletions docker/shard/bin/setup
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ EOF
cat >> /var/lib/postgresql/13/main/postgresql.conf <<-EOF
listen_addresses = '*'
port = 6432
max_prepared_transactions = 5
EOF

sudo -u postgres /usr/lib/postgresql/13/bin/pg_ctl -D /var/lib/postgresql/13/main/ start
Expand Down
42 changes: 39 additions & 3 deletions docker/tests/bin/move.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ sleep 20

set -ex

psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c 'ADD SHARDING RULE r1 COLUMNS w_id;' || {
echo "ERROR: tests failed"
exit 1
}

psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c 'ADD KEY RANGE krid1 FROM 1 TO 10 ROUTE TO sh1;' || {
echo "ERROR: tests failed"
exit 1
}

psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c 'ADD KEY RANGE krid2 FROM 11 TO 20 ROUTE TO sh2;' || {
echo "ERROR: tests failed"
exit 1
}

psql "host=spqr_router_1_1 sslmode=disable user=user1 dbname=db1 port=6432" -c 'CREATE TABLE xMove(w_id INT, s TEXT)' || {
echo "ERROR: tests failed"
exit 1
Expand All @@ -24,23 +39,44 @@ psql "host=spqr_router_1_1 sslmode=disable user=user1 dbname=db1 port=6432" -c "
exit 1
}

psql "host=spqr_router_1_1 sslmode=disable user=user1 dbname=db1 port=7432" -c "LOCK KEY RANGE krid2;" || {
psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c "LOCK KEY RANGE krid2;" || {
echo "ERROR: tests failed"
exit 1
}

psql "host=spqr_router_1_1 sslmode=disable user=user1 dbname=db1 port=7432" -c "MOVE KEY RANGE krid2 to sh1;" || {
psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c "MOVE KEY RANGE krid2 to sh1;" || {
echo "ERROR: tests failed"
exit 1
}

psql "host=spqr_router_1_1 sslmode=disable user=user1 dbname=db1 port=7432" -c "UNLOCK KEY RANGE krid2;" || {
psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c "UNLOCK KEY RANGE krid2;" || {
echo "ERROR: tests failed"
exit 1
}

out=$(psql "host=spqr_shard_1 sslmode=disable user=user1 dbname=db1 port=6432" -c "select * from xMove")
test "$out" = " w_id | s
------+-----
1 | 001
11 | 002
(2 rows)"

out=$(psql "host=spqr_shard_2 sslmode=disable user=user1 dbname=db1 port=6432" -c "select * from xMove")
test "$out" = " w_id | s
------+---
(0 rows)"

psql "host=spqr_router_1_1 sslmode=disable user=user1 dbname=db1 port=6432" -c "select * from xMove where w_id = 11" || {
echo "ERROR: tests failed"
exit 1
}

psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c "DROP KEY RANGE ALL;" || {
echo "ERROR: tests failed"
exit 1
}
psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c 'DROP SHARDING RULE ALL;' || {
echo "ERROR: tests failed"
exit 1
}

1 change: 1 addition & 0 deletions examples/coordinator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ addr: 'localhost:7002'
http_addr: 'localhost:7003'
qdb_addr: 'localhost:2379'
log_level: INFO
shard_data: '/spqr/docker/coordinator/shard_data.yaml'

9 changes: 5 additions & 4 deletions pkg/config/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (
var cfgCoordinator Coordinator

type Coordinator struct {
LogLevel string `json:"log_level" toml:"log_level" yaml:"log_level"`
QdbAddr string `json:"qdb_addr" toml:"qdb_addr" yaml:"qdb_addr"`
HttpAddr string `json:"http_addr" toml:"http_addr" yaml:"http_addr"`
Addr string `json:"addr" toml:"addr" yaml:"addr"`
LogLevel string `json:"log_level" toml:"log_level" yaml:"log_level"`
QdbAddr string `json:"qdb_addr" toml:"qdb_addr" yaml:"qdb_addr"`
HttpAddr string `json:"http_addr" toml:"http_addr" yaml:"http_addr"`
Addr string `json:"addr" toml:"addr" yaml:"addr"`
ShardDataCfg string `json:"shard_data" toml:"shard_data" yaml:"shard_data"`
}

func LoadCoordinatorCfg(cfgPath string) error {
Expand Down
66 changes: 66 additions & 0 deletions pkg/config/data_transfers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package config

import (
"encoding/json"
"fmt"
"log"
"os"
"strings"

"github.com/BurntSushi/toml"
"gopkg.in/yaml.v2"
)

type DatatransferConnections struct {
ShardsData map[string]*ShardConnect `json:"shards" toml:"shards" yaml:"shards"`
}

type ShardConnect struct {
Host string `json:"host" toml:"host" yaml:"host"`
Port string `json:"port" toml:"port" yaml:"port"`
DB string `json:"db" toml:"db" yaml:"db"`
User string `json:"user" toml:"user" yaml:"user"`
Password string `json:"password" toml:"password" yaml:"password"`
}

func LoadShardDataCfg(cfgPath string) (*DatatransferConnections, error) {
var cfg DatatransferConnections
file, err := os.Open(cfgPath)
if err != nil {
return &cfg, err
}

defer func(file *os.File) {
err := file.Close()
if err != nil {
log.Fatalf("failed to close config file: %v", err)
}
}(file)

if err := initShardDataConfig(file, &cfg); err != nil {
return &cfg, err
}

configBytes, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return &cfg, err
}

fmt.Println("Running config:", string(configBytes))

return &cfg, nil
}

func initShardDataConfig(file *os.File, cfg *DatatransferConnections) error {
if strings.HasSuffix(file.Name(), ".toml") {
_, err := toml.NewDecoder(file).Decode(&cfg)
return err
}
if strings.HasSuffix(file.Name(), ".yaml") {
return yaml.NewDecoder(file).Decode(&cfg)
}
if strings.HasSuffix(file.Name(), ".json") {
return json.NewDecoder(file).Decode(&cfg)
}
return fmt.Errorf("unknown config format type: %s. Use .toml, .yaml or .json suffix in filename", file.Name())
}
Loading
Loading