Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinator Show clients/pools/backend_connections funcs #265

Merged
merged 14 commits into from
Aug 9, 2023
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pooler_run:
####################### TESTS #######################

unittest:
go test ./cmd/... ./pkg/... ./router/... ./qdb/...
go test ./cmd/... ./pkg/... ./router/... ./qdb/... ./coordinator/...

regress_local: proxy_2sh_run
./script/regress_local.sh
Expand Down
28 changes: 28 additions & 0 deletions coordinator/provider/coord_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package provider

import (
"fmt"

"github.com/pg-sharding/spqr/pkg/models/kr"
"github.com/pg-sharding/spqr/pkg/pool"
protos "github.com/pg-sharding/spqr/pkg/protos"
"github.com/pg-sharding/spqr/pkg/shard"
)

type CoordPool struct {
pool.ConnectionKepperData
}

func NewCoordPool(info *protos.PoolInfo) *CoordPool {
return &CoordPool{
ConnectionKepperData: *pool.NewConnectionKepperData(info),
}
}

func (r *CoordPool) Connection(clid string, shardKey kr.ShardKey) (shard.Shard, error) {
return nil, fmt.Errorf("unimplemented")
}

func (r *CoordPool) ForEach(cb func(p shard.Shardinfo) error) error {
return fmt.Errorf("unimplemented")
}
50 changes: 50 additions & 0 deletions coordinator/provider/coord_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package provider_test

import (
"testing"

"github.com/pg-sharding/spqr/coordinator/provider"
"github.com/pg-sharding/spqr/pkg/models/kr"
proto "github.com/pg-sharding/spqr/pkg/protos"
"github.com/pg-sharding/spqr/pkg/shard"
"github.com/stretchr/testify/assert"
)

func TestIteratorForeach(t *testing.T) {
assert := assert.New(t)
p := provider.CoordPool{}

err := p.ForEach(func(p shard.Shardinfo) error { return nil })

assert.Error(err)
}

func TestCoordPoolConn(t *testing.T) {
assert := assert.New(t)
p := provider.CoordPool{}

_, err := p.Connection("klid", kr.ShardKey{})

assert.Error(err)
}

func TestCoordPoolNew(t *testing.T) {
assert := assert.New(t)
inf := &proto.PoolInfo{
Id: "id",
DB: "db",
Usr: "usr",
Host: "host",
ConnCount: 1,
IdleConnCount: 2,
QueueSize: 3,
}
p := provider.NewCoordPool(inf)

assert.Equal("db", p.Rule().DB)
assert.Equal("usr", p.Rule().Usr)
assert.Equal("host", p.Hostname())
assert.Equal(1, p.UsedConnectionCount())
assert.Equal(2, p.IdleConnectionCount())
assert.Equal(3, p.QueueResidualSize())
}
22 changes: 20 additions & 2 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (ci grpcConnectionIterator) ForEach(cb func(sh shard.Shardinfo) error) erro
}

for _, conn := range resp.Conns {
err = cb(NewCoordShardInfo(conn))
err = cb(NewCoordShardInfo(conn, addr))
if err != nil {
return err
}
Expand All @@ -123,7 +123,25 @@ func (ci grpcConnectionIterator) ForEach(cb func(sh shard.Shardinfo) error) erro
}

func (ci grpcConnectionIterator) ForEachPool(cb func(p pool.Pool) error) error {
return fmt.Errorf("not implemented")
return ci.IterRouter(func(cc *grpc.ClientConn, addr string) error {
ctx := context.TODO()
rrBackConn := routerproto.NewPoolServiceClient(cc)

spqrlog.Zero.Debug().Msg("fetch pools with grpc")
resp, err := rrBackConn.ListPools(ctx, &routerproto.ListPoolsRequest{})
if err != nil {
spqrlog.Zero.Error().Msg("error fetching pools with grpc")
return err
}

for _, p := range resp.Pools {
err = cb(NewCoordPool(p))
if err != nil {
return err
}
}
return nil
})
}

var _ connectiterator.ConnectIterator = &grpcConnectionIterator{}
Expand Down
8 changes: 7 additions & 1 deletion coordinator/provider/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,24 @@ func (s *ShardServer) GetShardInfo(ctx context.Context, shardRequest *protos.Sha

type CoordShardInfo struct {
underlying *routerproto.BackendConnectionsInfo
router string
}

func NewCoordShardInfo(conn *routerproto.BackendConnectionsInfo) shard.Shardinfo {
func NewCoordShardInfo(conn *routerproto.BackendConnectionsInfo, router string) shard.Shardinfo {
return &CoordShardInfo{
underlying: conn,
router: router,
}
}

func (c *CoordShardInfo) DB() string {
return c.underlying.Dbname
}

func (c *CoordShardInfo) Router() string {
return c.router
}

func (c *CoordShardInfo) Usr() string {
return c.underlying.Dbname
}
Expand Down
2 changes: 1 addition & 1 deletion docker/router/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM spqr-base-image

ENTRYPOINT /spqr/spqr-router run -c ${ROUTER_CONFIG=/spqr/docker/router/cfg.yaml} --proto-debug
ENTRYPOINT CONFIG_PATH=${ROUTER_CONFIG=/spqr/docker/router/cfg.yaml} && CUR_HOST=$(cat ${CONFIG_PATH} | grep "host:") && sed -i "s/${CUR_HOST}/${ROUTER_HOST=${CUR_HOST}}/g" ${CONFIG_PATH} && /spqr/spqr-router run -c ${CONFIG_PATH} --proto-debug
12 changes: 9 additions & 3 deletions pkg/clientinteractor/interactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (pi *PSQLInteractor) Databases(dbs []string) error {
func (pi *PSQLInteractor) Pools(_ context.Context, ps []pool.Pool) error {
if err := pi.WriteHeader(
"pool id",
"pool router",
"pool db",
"pool usr",
"pool host",
Expand All @@ -126,10 +127,10 @@ func (pi *PSQLInteractor) Pools(_ context.Context, ps []pool.Pool) error {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}

for _, p := range ps {
if err := pi.WriteDataRow(
fmt.Sprintf("%p", p),
p.RouterName(),
p.Rule().DB,
p.Rule().Usr,
p.Hostname(),
Expand Down Expand Up @@ -715,14 +716,19 @@ func (pi *PSQLInteractor) KillClient(clientID string) error {
}

func (pi *PSQLInteractor) BackendConnections(ctx context.Context, shs []shard.Shardinfo) error {
if err := pi.WriteHeader("backend connection id", "shard key name", "hostname", "user", "dbname", "sync", "tx_served", "tx status"); err != nil {
if err := pi.WriteHeader("backend connection id", "router", "shard key name", "hostname", "user", "dbname", "sync", "tx_served", "tx status"); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}

for _, sh := range shs {
router := "no data"
s, ok := sh.(shard.CoordShardinfo)
if ok {
router = s.Router()
}

if err := pi.WriteDataRow(sh.ID(), sh.ShardKeyName(), sh.InstanceHostname(), sh.Usr(), sh.DB(), strconv.FormatInt(sh.Sync(), 10), strconv.FormatInt(sh.TxServed(), 10), sh.TxStatus().String()); err != nil {
if err := pi.WriteDataRow(sh.ID(), router, sh.ShardKeyName(), sh.InstanceHostname(), sh.Usr(), sh.DB(), strconv.FormatInt(sh.Sync(), 10), strconv.FormatInt(sh.TxServed(), 10), sh.TxStatus().String()); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
Expand Down
93 changes: 93 additions & 0 deletions pkg/pool/conn_keeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package pool

import (
"fmt"
"sync"

"github.com/pg-sharding/spqr/pkg/config"
protos "github.com/pg-sharding/spqr/pkg/protos"
"github.com/pg-sharding/spqr/pkg/shard"
)

type ConnectionKepperData struct {
Id string
DB string
Usr string
Host string
Router string
ConnCount int64
IdleConnCount int64
QueueSize int64

m sync.RWMutex
}

func NewConnectionKepperData(info *protos.PoolInfo) *ConnectionKepperData {
return &ConnectionKepperData{
Id: info.Id,
DB: info.DB,
Usr: info.Usr,
Router: info.RouterName,
Host: info.Host,
ConnCount: info.ConnCount,
IdleConnCount: info.IdleConnCount,
QueueSize: info.QueueSize,
m: sync.RWMutex{},
}
}

func (r *ConnectionKepperData) Put(host shard.Shard) error {
return fmt.Errorf("unimplemented")
}

func (r *ConnectionKepperData) Discard(sh shard.Shard) error {
return fmt.Errorf("unimplemented")
}

func (r *ConnectionKepperData) UsedConnectionCount() int {
r.m.Lock()
defer r.m.Unlock()

return int(r.ConnCount)
}

func (r *ConnectionKepperData) IdleConnectionCount() int {
r.m.Lock()
defer r.m.Unlock()

return int(r.IdleConnCount)
}

func (r *ConnectionKepperData) QueueResidualSize() int {
r.m.Lock()
defer r.m.Unlock()

return int(r.QueueSize)
}

func (r *ConnectionKepperData) Hostname() string {
r.m.Lock()
defer r.m.Unlock()

return r.Host
}

func (r *ConnectionKepperData) RouterName() string {
r.m.Lock()
defer r.m.Unlock()
return r.Router
}

func (r *ConnectionKepperData) List() []shard.Shard {
return nil
}

func (r *ConnectionKepperData) Rule() *config.BackendRule {
r.m.Lock()
defer r.m.Unlock()

return &config.BackendRule{
DB: r.DB,
Usr: r.Usr,
}
}
96 changes: 96 additions & 0 deletions pkg/pool/conn_keeper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package pool_test

import (
"testing"

"github.com/pg-sharding/spqr/pkg/datashard"
"github.com/pg-sharding/spqr/pkg/pool"
"github.com/stretchr/testify/assert"
)

func TestCoordConnectionKepperRule(t *testing.T) {
assert := assert.New(t)

k := pool.ConnectionKepperData{
DB: "db",
Usr: "usr",
}

br := k.Rule()

assert.Equal("db", br.DB)
assert.Equal("usr", br.Usr)
}

func TestCoordConnectionKepperList(t *testing.T) {
assert := assert.New(t)

k := pool.ConnectionKepperData{}

assert.Nil(k.List())
}

func TestCoordConnectionKepperHostname(t *testing.T) {
assert := assert.New(t)

k := pool.ConnectionKepperData{
Host: "host",
}

assert.Equal("host", k.Hostname())
}

func TestCoordConnectionKepperQueueResidualSize(t *testing.T) {
assert := assert.New(t)
k := pool.ConnectionKepperData{
QueueSize: 3,
}

assert.Equal(3, k.QueueResidualSize())
}

func TestCoordConnectionKepperConnectionCount(t *testing.T) {
assert := assert.New(t)
k := pool.ConnectionKepperData{
ConnCount: 1,
IdleConnCount: 2,
}

assert.Equal(1, k.UsedConnectionCount())
assert.Equal(2, k.IdleConnectionCount())
}

func TestCoordConnectionKepperControlling(t *testing.T) {
assert := assert.New(t)
k := pool.ConnectionKepperData{}
err := k.Put(&datashard.Conn{})
err1 := k.Discard(&datashard.Conn{})

assert.Error(err)
assert.Error(err1)
}

func TestCoordConnectionKepperThreading(t *testing.T) {
assert := assert.New(t)
inf := &pool.ConnectionKepperData{
Id: "id",
DB: "db",
Usr: "usr",
Host: "host",
ConnCount: 1,
IdleConnCount: 2,
QueueSize: 3,
}
for k := 0; k < 100; k++ {
go func() {
for i := 0; i < 100; i++ {
assert.Equal("db", inf.Rule().DB)
assert.Equal("usr", inf.Rule().Usr)
assert.Equal("host", inf.Hostname())
assert.Equal(1, inf.UsedConnectionCount())
assert.Equal(2, inf.IdleConnectionCount())
assert.Equal(3, inf.QueueResidualSize())
}
}()
}
}
1 change: 1 addition & 0 deletions pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ConnectionKepper interface {
QueueResidualSize() int

Hostname() string
RouterName() string

List() []shard.Shard

Expand Down
Loading
Loading