Skip to content

Commit

Permalink
Refactor init sql code
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Apr 1, 2024
1 parent 6f10f30 commit a2a5133
Show file tree
Hide file tree
Showing 18 changed files with 141 additions and 61 deletions.
8 changes: 6 additions & 2 deletions cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/pg-sharding/spqr/pkg/datatransfers"
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/qdb"
router "github.com/pg-sharding/spqr/router"
"github.com/pg-sharding/spqr/router/app"
"github.com/pg-sharding/spqr/router/instance"
"github.com/pkg/errors"
"github.com/sevlyar/go-daemon"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -135,6 +135,10 @@ var runCmd = &cobra.Command{
spqrlog.Zero.Debug().Msg("daemon started")
}

if rcfg.UseCoordinatorInit && rcfg.UseInitSQL {
return fmt.Errorf("Cannot use initSQL andd coordinator-based init simultaneously")
}

ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

Expand Down Expand Up @@ -181,7 +185,7 @@ var runCmd = &cobra.Command{
/* will change on reload */
rcfg.PgprotoDebug = rcfg.PgprotoDebug || pgprotoDebug
rcfg.ShowNoticeMessages = rcfg.ShowNoticeMessages || pgprotoDebug
router, err := router.NewRouter(ctx, rcfg, os.Getenv("NOTIFY_SOCKET"), persist)
router, err := instance.NewRouter(ctx, rcfg, os.Getenv("NOTIFY_SOCKET"), persist)
if err != nil {
return errors.Wrap(err, "router failed to start")
}
Expand Down
3 changes: 0 additions & 3 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pg-sharding/spqr/pkg/pool"
routerproto "github.com/pg-sharding/spqr/pkg/protos"
"github.com/pg-sharding/spqr/qdb"
router "github.com/pg-sharding/spqr/router"
psqlclient "github.com/pg-sharding/spqr/router/client"
"github.com/pg-sharding/spqr/router/port"
"github.com/pg-sharding/spqr/router/route"
Expand Down Expand Up @@ -179,8 +178,6 @@ func (r *routerConn) ID() string {
return r.id
}

var _ router.Router = &routerConn{}

func DialRouter(r *topology.Router) (*grpc.ClientConn, error) {
spqrlog.Zero.Debug().
Str("router-id", r.ID).
Expand Down
3 changes: 3 additions & 0 deletions docker/router/cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ host: '[spqr_router_1_1]'
router_port: '6432'
admin_console_port: '7432'
grpc_api_port: '7000'

init_sql: /spqr/docker/router/init.sql
use_init_sql: true

router_mode: PROXY
log_level: debug
time_quantiles:
Expand Down
3 changes: 3 additions & 0 deletions docker/router/cfg2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ host: '[spqr_router_1_2]'
router_port: '6432'
admin_console_port: '7432'
grpc_api_port: '7000'

init_sql: /spqr/docker/router/init.sql
use_init_sql: true

router_mode: PROXY
log_level: debug
time_quantiles:
Expand Down
2 changes: 2 additions & 0 deletions examples/2shardproxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ world_shard_fallback: true
router_mode: PROXY

init_sql: "examples/init.sql"
use_init_sql: true

memqdb_backup_path: "memqdb.json"

frontend_tls:
Expand Down
5 changes: 4 additions & 1 deletion pkg/config/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ type Router struct {
WorldShardFallback bool `json:"world_shard_fallback" toml:"world_shard_fallback" yaml:"world_shard_fallback"`
ShowNoticeMessages bool `json:"show_notice_messages" toml:"show_notice_messages" yaml:"show_notice_messages"`

InitSQL string `json:"init_sql" toml:"init_sql" yaml:"init_sql"`
InitSQL string `json:"init_sql" toml:"init_sql" yaml:"init_sql"`
UseInitSQL bool `json:"use_init_sql" toml:"use_init_sql" yaml:"use_init_sql"`
UseCoordinatorInit bool `json:"use_coordinator_init" toml:"use_coordinator_init" yaml:"use_coordinator_init"`

MemqdbBackupPath string `json:"memqdb_backup_path" toml:"memqdb_backup_path" yaml:"memqdb_backup_path"`
MemqdbPersistent bool `json:"memqdb_persistent" toml:"memqdb_persistent" yaml:"memqdb_persistent"`
RouterMode string `json:"router_mode" toml:"router_mode" yaml:"router_mode"`
Expand Down
6 changes: 3 additions & 3 deletions router/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import (

reuse "github.com/libp2p/go-reuseport"
"github.com/pg-sharding/spqr/pkg/spqrlog"
router "github.com/pg-sharding/spqr/router"
rgrpc "github.com/pg-sharding/spqr/router/grpc"
"github.com/pg-sharding/spqr/router/instance"
"github.com/pg-sharding/spqr/router/port"
"google.golang.org/grpc"
)

type App struct {
spqr *router.InstanceImpl
spqr *instance.InstanceImpl
}

func NewApp(sg *router.InstanceImpl) *App {
func NewApp(sg *instance.InstanceImpl) *App {
return &App{
spqr: sg,
}
Expand Down
2 changes: 1 addition & 1 deletion router/frontend.go → router/frontend/frontend.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app
package frontend

import (
"fmt"
Expand Down
12 changes: 6 additions & 6 deletions router/frontend_test.go → router/frontend/frontend_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app_test
package frontend_test

import (
"io"
Expand All @@ -11,7 +11,7 @@ import (
"github.com/pg-sharding/spqr/pkg/models/kr"
"github.com/pg-sharding/spqr/pkg/shard"
"github.com/pg-sharding/spqr/pkg/txstatus"
app "github.com/pg-sharding/spqr/router"
"github.com/pg-sharding/spqr/router/frontend"
mockcl "github.com/pg-sharding/spqr/router/mock/client"
mockqr "github.com/pg-sharding/spqr/router/mock/qrouter"
mocksrv "github.com/pg-sharding/spqr/router/mock/server"
Expand Down Expand Up @@ -41,7 +41,7 @@ func TestFrontendSimpleEOF(t *testing.T) {

cmngr.EXPECT().UnRouteCB(gomock.Any(), gomock.Any()).Times(1)

err := app.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)

assert.NoError(err, "")
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestFrontendSimple(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := app.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)

assert.NoError(err, "")
}
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestFrontendXProto(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := app.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)

assert.NoError(err, "")
}
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestFrontendSimpleCopyIn(t *testing.T) {

cl.EXPECT().Receive().Times(1).Return(nil, io.EOF)

err := app.Frontend(qr, cl, cmngr, &config.Router{}, nil)
err := frontend.Frontend(qr, cl, cmngr, &config.Router{}, nil)

assert.NoError(err, "")
}
5 changes: 5 additions & 0 deletions router/instance/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package instance

type EtcdMetadataBootstraper struct {
RouterMetadataBootstraper
}
9 changes: 9 additions & 0 deletions router/instance/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package instance

import (
"context"
)

type RouterMetadataBootstraper interface {
InitializeMetadata(ctx context.Context, r RouterInstance) error
}
76 changes: 32 additions & 44 deletions router/instance.go → router/instance/instance.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package app
package instance

import (
"context"
"crypto/tls"
"fmt"
"net"
"os"
"time"

"github.com/pg-sharding/spqr/pkg/config"
Expand All @@ -14,18 +13,22 @@ import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/pkg/workloadlog"
"github.com/pg-sharding/spqr/qdb"
"github.com/pg-sharding/spqr/router/client"
"github.com/pg-sharding/spqr/router/console"
"github.com/pg-sharding/spqr/router/frontend"
"github.com/pg-sharding/spqr/router/poolmgr"
"github.com/pg-sharding/spqr/router/port"
"github.com/pg-sharding/spqr/router/qrouter"
"github.com/pg-sharding/spqr/router/rulerouter"
sdnotifier "github.com/pg-sharding/spqr/router/sdnotifier"
)

type Router interface {
type RouterInstance interface {
Addr() string
ID() string
Initialized() bool
Initialize() bool

Console() console.Console
}

type InstanceImpl struct {
Expand All @@ -43,6 +46,11 @@ type InstanceImpl struct {
notifier *sdnotifier.Notifier
}

// Console implements RouterInstance.
func (r *InstanceImpl) Console() console.Console {
return r.AdmConsole
}

func (r *InstanceImpl) ID() string {
return "noid"
}
Expand All @@ -55,15 +63,13 @@ func (r *InstanceImpl) Initialized() bool {
return r.Qrouter.Initialized()
}

var _ Router = &InstanceImpl{}
func (r *InstanceImpl) Initialize() bool {
return r.Qrouter.Initialize()
}

func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool) (*InstanceImpl, error) {
/* TODO: fix by adding configurable setting */
skipInitSQL := false
if _, err := os.Stat(rcfg.MemqdbBackupPath); !persist && err == nil {
skipInitSQL = true
}
var _ RouterInstance = &InstanceImpl{}

func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool) (*InstanceImpl, error) {
var db *qdb.MemQDB
var err error

Expand Down Expand Up @@ -130,37 +136,7 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool
return nil, err
}

if !skipInitSQL {
for _, fname := range []string{
rcfg.InitSQL,
} {
if len(fname) == 0 {
continue
}
queries, err := localConsole.Qlog().Recover(ctx, fname)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to initialize router")
return nil, err
}

spqrlog.Zero.Info().Msg("executing init sql")
for _, query := range queries {
spqrlog.Zero.Info().Str("query", query).Msg("")
if err := localConsole.ProcessQuery(ctx, query, client.NewFakeClient()); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
}

spqrlog.Zero.Info().
Int("count", len(queries)).
Str("filename", fname).
Msg("successfully init queries from file")
}
}

qr.Initialize()

return &InstanceImpl{
r := &InstanceImpl{
RuleRouter: rr,
Qrouter: qr,
AdmConsole: localConsole,
Expand All @@ -170,7 +146,19 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool
WithJaeger: rcfg.WithJaeger,
Writer: writ,
notifier: notifier,
}, nil
}

/* initialize metadata */
if rcfg.UseInitSQL {
i := NewInitSQLMetadataBootstraper(rcfg.InitSQL)
if err := i.InitializeMetadata(ctx, r); err != nil {
return nil, err
}
} else if rcfg.UseCoordinatorInit {
panic("implement me")
}

return r, nil
}

func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error {
Expand Down Expand Up @@ -206,7 +194,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error {
_, _ = routerClient.Route().ReleaseClient(routerClient.ID())
}()

return Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer)
return frontend.Frontend(r.Qrouter, routerClient, cmngr, r.RuleRouter.Config(), r.Writer)
}

func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener, pt port.RouterPortType) error {
Expand Down
54 changes: 54 additions & 0 deletions router/instance/sqlfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package instance

import (
"context"

"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/router/client"
)

type InitSQLMetadataBootstraper struct {
InitSQLFIle string
}

// InitializeMetadata implements RouterMetadataBootstraper.
func (i *InitSQLMetadataBootstraper) InitializeMetadata(ctx context.Context, r RouterInstance) error {
for _, fname := range []string{
// rcfg.InitSQL,
i.InitSQLFIle,
} {
if len(fname) == 0 {
continue
}
queries, err := r.Console().Qlog().Recover(ctx, fname)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to initialize router")
return err
}

spqrlog.Zero.Info().Msg("executing init sql")
for _, query := range queries {
spqrlog.Zero.Info().Str("query", query).Msg("")
if err := r.Console().ProcessQuery(ctx, query, client.NewFakeClient()); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
}

spqrlog.Zero.Info().
Int("count", len(queries)).
Str("filename", fname).
Msg("successfully init queries from file")
}

r.Initialize()

return nil
}

func NewInitSQLMetadataBootstraper(InitSQLFIle string) RouterMetadataBootstraper {
return &InitSQLMetadataBootstraper{
InitSQLFIle: InitSQLFIle,
}
}

var _ RouterMetadataBootstraper = &InitSQLMetadataBootstraper{}
2 changes: 1 addition & 1 deletion router/trace.go → router/instance/trace.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app
package instance

import (
"io"
Expand Down
3 changes: 3 additions & 0 deletions test/feature/conf/router_with_backup_and_initsql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ grpc_api_port: '7000'
router_mode: PROXY
log_level: debug
memqdb_backup_path: memqdb.backup

init_sql: /spqr/test/feature/conf/init.sql
use_init_sql: true

time_quantiles:
- 0.75
world_shard_fallback: true
Expand Down
Loading

0 comments on commit a2a5133

Please sign in to comment.