Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor init sql code #588

Merged
merged 4 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/pg-sharding/spqr/pkg/datatransfers"
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/qdb"
router "github.com/pg-sharding/spqr/router"
"github.com/pg-sharding/spqr/router/app"
"github.com/pg-sharding/spqr/router/instance"
"github.com/pkg/errors"
"github.com/sevlyar/go-daemon"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -135,6 +135,10 @@ var runCmd = &cobra.Command{
spqrlog.Zero.Debug().Msg("daemon started")
}

if rcfg.UseCoordinatorInit && rcfg.UseInitSQL {
return fmt.Errorf("Cannot use initSQL andd coordinator-based init simultaneously")
}

ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

Expand Down Expand Up @@ -181,7 +185,7 @@ var runCmd = &cobra.Command{
/* will change on reload */
rcfg.PgprotoDebug = rcfg.PgprotoDebug || pgprotoDebug
rcfg.ShowNoticeMessages = rcfg.ShowNoticeMessages || pgprotoDebug
router, err := router.NewRouter(ctx, rcfg, os.Getenv("NOTIFY_SOCKET"), persist)
router, err := instance.NewRouter(ctx, rcfg, os.Getenv("NOTIFY_SOCKET"), persist)
if err != nil {
return errors.Wrap(err, "router failed to start")
}
Expand Down
17 changes: 0 additions & 17 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pg-sharding/spqr/pkg/pool"
routerproto "github.com/pg-sharding/spqr/pkg/protos"
"github.com/pg-sharding/spqr/qdb"
router "github.com/pg-sharding/spqr/router"
psqlclient "github.com/pg-sharding/spqr/router/client"
"github.com/pg-sharding/spqr/router/port"
"github.com/pg-sharding/spqr/router/route"
Expand Down Expand Up @@ -165,22 +164,6 @@ func (ci grpcConnectionIterator) ForEachPool(cb func(p pool.Pool) error) error {

var _ connectiterator.ConnectIterator = &grpcConnectionIterator{}

type routerConn struct {
routerproto.KeyRangeServiceClient
addr string
id string
}

func (r *routerConn) Addr() string {
return r.addr
}

func (r *routerConn) ID() string {
return r.id
}

var _ router.Router = &routerConn{}

func DialRouter(r *topology.Router) (*grpc.ClientConn, error) {
spqrlog.Zero.Debug().
Str("router-id", r.ID).
Expand Down
3 changes: 3 additions & 0 deletions docker/router/cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ host: '[spqr_router_1_1]'
router_port: '6432'
admin_console_port: '7432'
grpc_api_port: '7000'

init_sql: /spqr/docker/router/init.sql
use_init_sql: true

router_mode: PROXY
log_level: debug
time_quantiles:
Expand Down
3 changes: 3 additions & 0 deletions docker/router/cfg2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ host: '[spqr_router_1_2]'
router_port: '6432'
admin_console_port: '7432'
grpc_api_port: '7000'

init_sql: /spqr/docker/router/init.sql
use_init_sql: true

router_mode: PROXY
log_level: debug
time_quantiles:
Expand Down
2 changes: 2 additions & 0 deletions examples/2shardproxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ world_shard_fallback: true
router_mode: PROXY

init_sql: "examples/init.sql"
use_init_sql: true

memqdb_backup_path: "memqdb.json"

frontend_tls:
Expand Down
5 changes: 4 additions & 1 deletion pkg/config/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ type Router struct {
WorldShardFallback bool `json:"world_shard_fallback" toml:"world_shard_fallback" yaml:"world_shard_fallback"`
ShowNoticeMessages bool `json:"show_notice_messages" toml:"show_notice_messages" yaml:"show_notice_messages"`

InitSQL string `json:"init_sql" toml:"init_sql" yaml:"init_sql"`
InitSQL string `json:"init_sql" toml:"init_sql" yaml:"init_sql"`
UseInitSQL bool `json:"use_init_sql" toml:"use_init_sql" yaml:"use_init_sql"`
UseCoordinatorInit bool `json:"use_coordinator_init" toml:"use_coordinator_init" yaml:"use_coordinator_init"`

MemqdbBackupPath string `json:"memqdb_backup_path" toml:"memqdb_backup_path" yaml:"memqdb_backup_path"`
MemqdbPersistent bool `json:"memqdb_persistent" toml:"memqdb_persistent" yaml:"memqdb_persistent"`
RouterMode string `json:"router_mode" toml:"router_mode" yaml:"router_mode"`
Expand Down
6 changes: 3 additions & 3 deletions router/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import (

reuse "github.com/libp2p/go-reuseport"
"github.com/pg-sharding/spqr/pkg/spqrlog"
router "github.com/pg-sharding/spqr/router"
rgrpc "github.com/pg-sharding/spqr/router/grpc"
"github.com/pg-sharding/spqr/router/instance"
"github.com/pg-sharding/spqr/router/port"
"google.golang.org/grpc"
)

type App struct {
spqr *router.InstanceImpl
spqr *instance.InstanceImpl
}

func NewApp(sg *router.InstanceImpl) *App {
func NewApp(sg *instance.InstanceImpl) *App {
return &App{
spqr: sg,
}
Expand Down
2 changes: 1 addition & 1 deletion router/frontend.go → router/frontend/frontend.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app
package frontend

import (
"fmt"
Expand Down
12 changes: 6 additions & 6 deletions router/frontend_test.go → router/frontend/frontend_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app_test
package frontend_test

import (
"io"
Expand All @@ -11,7 +11,7 @@ import (
"github.com/pg-sharding/spqr/pkg/models/kr"
"github.com/pg-sharding/spqr/pkg/shard"
"github.com/pg-sharding/spqr/pkg/txstatus"
app "github.com/pg-sharding/spqr/router"
"github.com/pg-sharding/spqr/router/frontend"
mockcl "github.com/pg-sharding/spqr/router/mock/client"
mockqr "github.com/pg-sharding/spqr/router/mock/qrouter"
mocksrv "github.com/pg-sharding/spqr/router/mock/server"
Expand Down Expand Up @@ -41,7 +41,7 @@ func TestFrontendSimpleEOF(t *testing.T) {

cmngr.EXPECT().UnRouteCB(gomock.Any(), gomock.Any()).Times(1)

err := app.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)

assert.NoError(err, "")
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestFrontendSimple(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := app.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)

assert.NoError(err, "")
}
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestFrontendXProto(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := app.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)

assert.NoError(err, "")
}
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestFrontendSimpleCopyIn(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := app.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)

assert.NoError(err, "")
}
5 changes: 5 additions & 0 deletions router/instance/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package instance

type EtcdMetadataBootstraper struct {
RouterMetadataBootstraper
}
9 changes: 9 additions & 0 deletions router/instance/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package instance

import (
"context"
)

type RouterMetadataBootstraper interface {
InitializeMetadata(ctx context.Context, r RouterInstance) error
}
79 changes: 35 additions & 44 deletions router/instance.go → router/instance/instance.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package app
package instance

import (
"context"
"crypto/tls"
"fmt"
"net"
"os"
"time"

"github.com/pg-sharding/spqr/pkg/config"
Expand All @@ -14,18 +13,22 @@ import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/pkg/workloadlog"
"github.com/pg-sharding/spqr/qdb"
"github.com/pg-sharding/spqr/router/client"
"github.com/pg-sharding/spqr/router/console"
"github.com/pg-sharding/spqr/router/frontend"
"github.com/pg-sharding/spqr/router/poolmgr"
"github.com/pg-sharding/spqr/router/port"
"github.com/pg-sharding/spqr/router/qrouter"
"github.com/pg-sharding/spqr/router/rulerouter"
sdnotifier "github.com/pg-sharding/spqr/router/sdnotifier"
)

type Router interface {
type RouterInstance interface {
Addr() string
ID() string
Initialized() bool
Initialize() bool

Console() console.Console
}

type InstanceImpl struct {
Expand All @@ -43,6 +46,11 @@ type InstanceImpl struct {
notifier *sdnotifier.Notifier
}

// Console implements RouterInstance.
func (r *InstanceImpl) Console() console.Console {
return r.AdmConsole
}

func (r *InstanceImpl) ID() string {
return "noid"
}
Expand All @@ -55,15 +63,13 @@ func (r *InstanceImpl) Initialized() bool {
return r.Qrouter.Initialized()
}

var _ Router = &InstanceImpl{}
func (r *InstanceImpl) Initialize() bool {
return r.Qrouter.Initialize()
}

func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool) (*InstanceImpl, error) {
/* TODO: fix by adding configurable setting */
skipInitSQL := false
if _, err := os.Stat(rcfg.MemqdbBackupPath); !persist && err == nil {
skipInitSQL = true
}
var _ RouterInstance = &InstanceImpl{}

func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool) (*InstanceImpl, error) {
var db *qdb.MemQDB
var err error

Expand Down Expand Up @@ -130,37 +136,7 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool
return nil, err
}

if !skipInitSQL {
for _, fname := range []string{
rcfg.InitSQL,
} {
if len(fname) == 0 {
continue
}
queries, err := localConsole.Qlog().Recover(ctx, fname)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to initialize router")
return nil, err
}

spqrlog.Zero.Info().Msg("executing init sql")
for _, query := range queries {
spqrlog.Zero.Info().Str("query", query).Msg("")
if err := localConsole.ProcessQuery(ctx, query, client.NewFakeClient()); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
}

spqrlog.Zero.Info().
Int("count", len(queries)).
Str("filename", fname).
Msg("successfully init queries from file")
}
}

qr.Initialize()

return &InstanceImpl{
r := &InstanceImpl{
RuleRouter: rr,
Qrouter: qr,
AdmConsole: localConsole,
Expand All @@ -170,7 +146,22 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool
WithJaeger: rcfg.WithJaeger,
Writer: writ,
notifier: notifier,
}, nil
}

/* initialize metadata */
if rcfg.UseInitSQL {
i := NewInitSQLMetadataBootstraper(rcfg.InitSQL)
if err := i.InitializeMetadata(ctx, r); err != nil {
return nil, err
}
} else if rcfg.UseCoordinatorInit {
panic("implement me")
} else {
/* TODO: maybe error-out? */
r.Initialize()
}

return r, nil
}

func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error {
Expand Down Expand Up @@ -206,7 +197,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error {
_, _ = routerClient.Route().ReleaseClient(routerClient.ID())
}()

return Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer)
return frontend.Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer)
}

func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener, pt port.RouterPortType) error {
Expand Down
54 changes: 54 additions & 0 deletions router/instance/sqlfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package instance

import (
"context"

"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/router/client"
)

type InitSQLMetadataBootstraper struct {
InitSQLFIle string
}

// InitializeMetadata implements RouterMetadataBootstraper.
func (i *InitSQLMetadataBootstraper) InitializeMetadata(ctx context.Context, r RouterInstance) error {
for _, fname := range []string{
// rcfg.InitSQL,
i.InitSQLFIle,
} {
if len(fname) == 0 {
continue
}
queries, err := r.Console().Qlog().Recover(ctx, fname)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to initialize router")
return err
}

spqrlog.Zero.Info().Msg("executing init sql")
for _, query := range queries {
spqrlog.Zero.Info().Str("query", query).Msg("")
if err := r.Console().ProcessQuery(ctx, query, client.NewFakeClient()); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
}

spqrlog.Zero.Info().
Int("count", len(queries)).
Str("filename", fname).
Msg("successfully init queries from file")
}

r.Initialize()

return nil
}

func NewInitSQLMetadataBootstraper(InitSQLFIle string) RouterMetadataBootstraper {
return &InitSQLMetadataBootstraper{
InitSQLFIle: InitSQLFIle,
}
}

var _ RouterMetadataBootstraper = &InitSQLMetadataBootstraper{}
Loading
Loading