diff --git a/balancer/pkg/balancer.go b/balancer/pkg/balancer.go index 54caad5be..75519be59 100644 --- a/balancer/pkg/balancer.go +++ b/balancer/pkg/balancer.go @@ -13,8 +13,6 @@ import ( "sync" "time" - "github.com/wal-g/tracelog" - "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/pg-sharding/spqr/router/pkg/client" ) @@ -107,7 +105,7 @@ func (b *Balancer) Init(installation InstallationInterface, coordinator Coordina if err == nil { break } - //tracelog.ErrorLogger.PrintError(err) + //spqrlog.Logger.PrintError(err) fmt.Println("Error: trying to init state by coordinator, but got an error ", err) time.Sleep(b.retryTime) } @@ -115,7 +113,7 @@ func (b *Balancer) Init(installation InstallationInterface, coordinator Coordina for shard, keyRanges := range shardToKeyRanges { shardDistances, err := b.installation.GetKeyDistanceByRanges(shard, keyRanges) if err != nil { - tracelog.DebugLogger.PrintError(err) + spqrlog.Logger.PrintError(err) continue } for rng, dist := range shardDistances { @@ -187,7 +185,7 @@ func (b *Balancer) tryToUpdateShardStats(shard Shard, wg *sync.WaitGroup) { _stats, err := b.installation.GetShardStats(shard, keyRanges) if err != nil { - //tracelog.ErrorLogger.PrintError(err) + //spqrlog.Logger.PrintError(err) fmt.Println("Error: ", err) return } @@ -583,7 +581,7 @@ func (b *Balancer) runTask(task *Action) error { var err error for task.actionStage != actionStageDone { - tracelog.InfoLogger.Printf("Action stage: %v", task.actionStage) + spqrlog.Logger.Printf(spqrlog.INFO, "Action stage: %v", task.actionStage) switch task.actionStage { case actionStagePlan: @@ -709,11 +707,11 @@ func (b *Balancer) planTasks() { shardStats, err := b.installation.GetShardStats(shard, keyRanges) if err != nil { - tracelog.ErrorLogger.PrintError(err) + spqrlog.Logger.PrintError(err) return } - tracelog.InfoLogger.Printf("Plan tasks. ShardStats: %#v", shardStats) + spqrlog.Logger.Printf(spqrlog.INFO, "Plan tasks. ShardStats: %#v", shardStats) b.bestTask = Task{} for i := 0; i < b.plannerCount; i++ { @@ -807,7 +805,7 @@ func (b *Balancer) BrutForceStrategy() { reload := b.reloadRequired b.muReload.Unlock() if !reload { - tracelog.InfoLogger.Println("Check coordinator for reloading requirements") + spqrlog.Logger.Printf(spqrlog.INFO, "Check coordinator for reloading requirements") reload, err = b.coordinator.isReloadRequired() if err != nil { fmt.Println("Error while spqr.isReloadRequired call: ", err) diff --git a/balancer/pkg/db.go b/balancer/pkg/db.go index 72c0574d7..5c74db738 100644 --- a/balancer/pkg/db.go +++ b/balancer/pkg/db.go @@ -13,7 +13,7 @@ import ( "time" _ "github.com/jackc/pgx/v4/stdlib" - "github.com/wal-g/tracelog" + "github.com/pg-sharding/spqr/pkg/spqrlog" "golang.yandex/hasql" "golang.yandex/hasql/checkers" ) @@ -133,7 +133,7 @@ func NewCluster(addrs []string, dbname, user, password, sslMode, sslRootCert str nodes := make([]hasql.Node, 0, len(addrs)) for _, addr := range addrs { connString := ConnString(addr, dbname, user, password, sslMode, sslRootCert) - tracelog.InfoLogger.Printf("Connection string: %v", connString) + spqrlog.Logger.Printf(spqrlog.INFO, "Connection string: %v", connString) db, err := sql.Open("pgx", connString) if err != nil { @@ -144,7 +144,7 @@ func NewCluster(addrs []string, dbname, user, password, sslMode, sslRootCert str nodes = append(nodes, hasql.NewNode(addr, db)) } - tracelog.InfoLogger.Printf("Nodes: %v", nodes) + spqrlog.Logger.Printf(spqrlog.INFO, "Nodes: %v", nodes) return hasql.NewCluster(nodes, checkers.PostgreSQL) } diff --git a/cmd/logproxy/main.go b/cmd/logproxy/main.go index 7a0584174..309d5d946 100644 --- a/cmd/logproxy/main.go +++ b/cmd/logproxy/main.go @@ -1,9 +1,9 @@ package main import ( + "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/pg-sharding/spqr/test/logproxy" "github.com/spf13/cobra" - "github.com/wal-g/tracelog" ) var rootCmd = &cobra.Command{ @@ -18,21 +18,15 @@ var rootCmd = &cobra.Command{ func Execute() { if err := rootCmd.Execute(); err != nil { - tracelog.ErrorLogger.Fatal(err) + spqrlog.Logger.FatalOnError(err) } } var runCmd = &cobra.Command{ Use: "run", RunE: func(cmd *cobra.Command, args []string) error { - - pr := logproxy.NewProxy() - err := pr.Run() - if err != nil { - tracelog.ErrorLogger.FatalOnError(err) - } - - return err + pr := &logproxy.Proxy{} + return pr.Run() }, } diff --git a/cmd/router/main.go b/cmd/router/main.go index 4fd05a1f8..bbcb8e436 100644 --- a/cmd/router/main.go +++ b/cmd/router/main.go @@ -13,7 +13,6 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" - "github.com/wal-g/tracelog" "github.com/pg-sharding/spqr/pkg/config" "github.com/pg-sharding/spqr/router/app" @@ -27,8 +26,8 @@ var ( ) var rootCmd = &cobra.Command{ - Use: "./spqr-router run --config `path-to-config-folder`", - Short: "sqpr-rr", + Use: "spqr-router run --config `path-to-config-folder`", + Short: "sqpr-router", Long: "spqr-router", CompletionOptions: cobra.CompletionOptions{ DisableDefaultCmd: true, @@ -39,7 +38,7 @@ var rootCmd = &cobra.Command{ func Execute() { if err := rootCmd.Execute(); err != nil { - tracelog.ErrorLogger.Fatal(err) + spqrlog.Logger.FatalOnError(err) } } diff --git a/cmd/worldmock/main.go b/cmd/worldmock/main.go index 209ed2dd7..da6177d3e 100644 --- a/cmd/worldmock/main.go +++ b/cmd/worldmock/main.go @@ -1,9 +1,9 @@ package main import ( + "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/pg-sharding/spqr/test/worldmock" "github.com/spf13/cobra" - "github.com/wal-g/tracelog" ) var rootCmd = &cobra.Command{ @@ -18,7 +18,7 @@ var rootCmd = &cobra.Command{ func Execute() { if err := rootCmd.Execute(); err != nil { - tracelog.ErrorLogger.Fatal(err) + spqrlog.Logger.FatalOnError(err) } } @@ -28,14 +28,8 @@ var addr string var runCmd = &cobra.Command{ Use: "run", RunE: func(cmd *cobra.Command, args []string) error { - w := worldmock.NewWorldMock(addr) - err := w.Run() - if err != nil { - tracelog.ErrorLogger.FatalOnError(err) - } - - return err + return w.Run() }, } diff --git a/go.mod b/go.mod index f5f8b1592..abe120504 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/spf13/cobra v1.5.0 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible - github.com/wal-g/tracelog v0.0.0-20210121175938-897a155375e3 go.etcd.io/etcd/client/v3 v3.5.4 go.uber.org/atomic v1.9.0 golang.yandex/hasql v1.0.0 diff --git a/pkg/client/clientpool.go b/pkg/client/clientpool.go index 1a4e70c45..7af34d12a 100644 --- a/pkg/client/clientpool.go +++ b/pkg/client/clientpool.go @@ -4,7 +4,6 @@ import ( "sync" spqrlog "github.com/pg-sharding/spqr/pkg/spqrlog" - "github.com/wal-g/tracelog" ) type Pool interface { @@ -48,7 +47,9 @@ func (c *PoolImpl) Shutdown() error { for _, cl := range c.pool { go func(cl Client) { - tracelog.InfoLogger.PrintError(cl.Shutdown()) + if err := cl.Shutdown(); err != nil { + spqrlog.Logger.PrintError(err) + } }(cl) } diff --git a/router/pkg/rulerouter/route_pool.go b/router/pkg/rulerouter/route_pool.go index 74e76358c..bf9b9f782 100644 --- a/router/pkg/rulerouter/route_pool.go +++ b/router/pkg/rulerouter/route_pool.go @@ -8,7 +8,6 @@ import ( "github.com/pg-sharding/spqr/pkg/client" "github.com/pg-sharding/spqr/pkg/config" "github.com/pg-sharding/spqr/router/pkg/route" - "github.com/wal-g/tracelog" ) type RoutePool interface { @@ -44,7 +43,7 @@ func (r *RoutePoolImpl) NotifyRoutes(cb func(route *route.Route) error) error { for _, rt := range r.pool { go func(rt *route.Route) { if err := cb(rt); err != nil { - tracelog.InfoLogger.Printf("error while notifying route %v", err) + spqrlog.Logger.Printf(spqrlog.INFO, "error while notifying route %v", err) } }(rt) } @@ -83,7 +82,7 @@ func (r *RoutePoolImpl) MatchRoute(key route.Key, defer r.mu.Unlock() if nroute, ok := r.pool[key]; ok { - tracelog.InfoLogger.Printf("match route %v", key) + spqrlog.Logger.Printf(spqrlog.INFO, "match route %v", key) return nroute, nil } diff --git a/router/pkg/rulerouter/rulerouter.go b/router/pkg/rulerouter/rulerouter.go index f3b8ccc17..f041cda1d 100644 --- a/router/pkg/rulerouter/rulerouter.go +++ b/router/pkg/rulerouter/rulerouter.go @@ -19,7 +19,6 @@ import ( rclient "github.com/pg-sharding/spqr/router/pkg/client" "github.com/pg-sharding/spqr/router/pkg/route" "github.com/pkg/errors" - "github.com/wal-g/tracelog" ) type RuleRouter interface { @@ -48,7 +47,7 @@ type RuleRouterImpl struct { } func (r *RuleRouterImpl) AddWorldShard(key qdb.ShardKey) error { - tracelog.InfoLogger.Printf("added world datashard to rrouter %v", key.Name) + spqrlog.Logger.Printf(spqrlog.INFO, "added world datashard to rrouter %v", key.Name) return nil } diff --git a/router/pkg/trace.go b/router/pkg/trace.go index 1d503cf8d..a26c83c52 100644 --- a/router/pkg/trace.go +++ b/router/pkg/trace.go @@ -27,7 +27,7 @@ func (r *InstanceImpl) initJaegerTracer(rcfg *config.Router) (io.Closer, error) }, } - jLogger := jaegerlog.StdLogger //TODO: replace with tracelog logger + jLogger := jaegerlog.StdLogger //TODO: replace with spqrlog logger jMetricsFactory := metrics.NullFactory // Initialize tracer with a logger and a metrics factory diff --git a/test/logproxy/logproxy.go b/test/logproxy/logproxy.go index 3ed80a11d..09e21f0b7 100644 --- a/test/logproxy/logproxy.go +++ b/test/logproxy/logproxy.go @@ -7,11 +7,13 @@ import ( "os" "github.com/jackc/pgproto3/v2" + "github.com/pg-sharding/spqr/pkg/spqrlog" //"github.com/pg-sharding/spqr/pkg/config" //"github.com/pg-sharding/spqr/router/pkg/client" - "github.com/wal-g/tracelog" ) +const failedToReceiveMessage = "failed to received msg %w" + func getC() (net.Conn, error) { const proto = "tcp" const addr = "[::1]:6432" @@ -26,7 +28,7 @@ func (p *Proxy) Run() error { listener, err := net.Listen("tcp6", "[::1]:5433") if err != nil { - tracelog.ErrorLogger.PrintError(err) + spqrlog.Logger.PrintError(err) return err } defer listener.Close() @@ -55,7 +57,7 @@ func (p *Proxy) Run() error { go func() { if err := p.serv(c); err != nil { - tracelog.ErrorLogger.PrintError(err) + spqrlog.Logger.PrintError(err) } }() @@ -67,7 +69,6 @@ func (p *Proxy) serv(netconn net.Conn) error { conn, err := getC() if err != nil { - fmt.Printf("failed %w", err) return err } @@ -80,7 +81,7 @@ func (p *Proxy) serv(netconn net.Conn) error { // return err // } - // tracelog.InfoLogger.Printf("initialized client connection %s-%s\n", cl.Usr(), cl.DB()) + // spqrlog.Logger.Printf(spqrlog.INFO, "initialized client connection %s-%s\n", cl.Usr(), cl.DB()) // if err := cl.AssignRule(&config.FRRule{ // AuthRule: config.AuthRule{ @@ -93,23 +94,23 @@ func (p *Proxy) serv(netconn net.Conn) error { // if err := cl.Auth(); err != nil { // return err // } - // tracelog.InfoLogger.Printf("client auth OK") + // spqrlog.Logger.Printf(spqrlog.INFO, "client auth OK") cb := func(msg pgproto3.FrontendMessage) { - tracelog.InfoLogger.Printf("received msg %v", msg) + spqrlog.Logger.Printf(spqrlog.INFO, "received msg %v", msg) switch v := msg.(type) { case *pgproto3.Parse: - tracelog.InfoLogger.Printf("received prep stmt %v %v", v.Name, v.Query) + spqrlog.Logger.Printf(spqrlog.INFO, "received prep stmt %v %v", v.Name, v.Query) break case *pgproto3.Query: - tracelog.InfoLogger.Printf("received message %v", v.String) + spqrlog.Logger.Printf(spqrlog.INFO, "received message %v", v.String) default: } } shouldStop := func(msg pgproto3.BackendMessage) bool { - tracelog.InfoLogger.Printf("received msg %v", msg) + spqrlog.Logger.Printf(spqrlog.INFO, "received msg %v", msg) switch msg.(type) { case *pgproto3.ReadyForQuery: @@ -125,24 +126,20 @@ func (p *Proxy) serv(netconn net.Conn) error { cb(msg) if err != nil { - tracelog.ErrorLogger.Printf("failed to received msg %w", err) - return err + return fmt.Errorf(failedToReceiveMessage, err) } if err := frontend.Send(msg); err != nil { - tracelog.ErrorLogger.Printf("failed to received msg %w", err) - return err + return fmt.Errorf(failedToReceiveMessage, err) } for { retmsg, err := frontend.Receive() if err != nil { - tracelog.ErrorLogger.Printf("failed to received msg %w", err) - return err + return fmt.Errorf(failedToReceiveMessage, err) } err = cl.Send(retmsg) if err != nil { - tracelog.ErrorLogger.Printf("failed to received msg %w", err) - return err + return fmt.Errorf(failedToReceiveMessage, err) } if shouldStop(retmsg) { @@ -151,7 +148,3 @@ func (p *Proxy) serv(netconn net.Conn) error { } } } - -func NewProxy() *Proxy { - return &Proxy{} -} diff --git a/test/prepare_statement_basic.go b/test/prepare_statement_basic.go index 5829ec817..61045b136 100644 --- a/test/prepare_statement_basic.go +++ b/test/prepare_statement_basic.go @@ -9,7 +9,7 @@ import ( "time" "github.com/jackc/pgproto3/v2" - "github.com/wal-g/tracelog" + "github.com/pg-sharding/spqr/pkg/spqrlog" ) var readResp = flag.Bool("v", false, "Logs every packet in great detail") @@ -21,26 +21,27 @@ func getC() (net.Conn, error) { return net.Dial(proto, addr) } +var okerr = errors.New("something") + func readCnt(fr *pgproto3.Frontend, count int) error { for i := 0; i < count; i++ { if msg, err := fr.Receive(); err != nil { return err } else { - tracelog.InfoLogger.Printf("received %T msg", msg) + spqrlog.Logger.Printf(spqrlog.INFO, "received %T msg", msg) } } return nil } -var okerr = errors.New("something") func waitRFQ(fr *pgproto3.Frontend) error { for { if msg, err := fr.Receive(); err != nil { return err } else { - tracelog.InfoLogger.Printf("received %+v msg", msg) + spqrlog.Logger.Printf(spqrlog.INFO, "received %+v msg", msg) switch msg.(type) { case *pgproto3.ErrorResponse: return okerr @@ -78,7 +79,7 @@ func prepLong(fr *pgproto3.Frontend, waitforres bool) error { return err } - tracelog.InfoLogger.Printf("reading prep parse") + spqrlog.Logger.Printf(spqrlog.INFO, "reading prep parse") if err := waitRFQ(fr); err != nil { return err } @@ -98,11 +99,11 @@ func prepLong(fr *pgproto3.Frontend, waitforres bool) error { } if !waitforres { - tracelog.InfoLogger.Printf("not reading prep resp") + spqrlog.Logger.Printf(spqrlog.INFO, "not reading prep resp") return nil } - tracelog.InfoLogger.Printf("reading prep resp") + spqrlog.Logger.Printf(spqrlog.INFO, "reading prep resp") return waitRFQ(fr) } @@ -111,7 +112,7 @@ func gaogao(wg *sync.WaitGroup, waitforres bool) { conn, err := getC() if err != nil { - tracelog.ErrorLogger.Printf("failed to get conn %w", err) + spqrlog.Logger.Printf(spqrlog.ERROR, "failed to get conn %w", err) if err != okerr { panic(err) } @@ -129,7 +130,7 @@ func gaogao(wg *sync.WaitGroup, waitforres bool) { "password": "12345678", }, }); err != nil { - tracelog.ErrorLogger.Printf("startup failed %w", err) + spqrlog.Logger.Printf(spqrlog.ERROR, "startup failed %w", err) if err != okerr { panic(err) } @@ -138,7 +139,7 @@ func gaogao(wg *sync.WaitGroup, waitforres bool) { time.Sleep(200 * time.Millisecond) if err := waitRFQ(frontend); err != nil { - tracelog.ErrorLogger.Printf("startup failed %w", err) + spqrlog.Logger.Printf(spqrlog.ERROR, "startup failed %w", err) if err != okerr { panic(err) } @@ -146,14 +147,14 @@ func gaogao(wg *sync.WaitGroup, waitforres bool) { } if err := prepLong(frontend, waitforres); err != nil { - tracelog.ErrorLogger.Printf("prep failed %w", err) + spqrlog.Logger.Printf(spqrlog.ERROR, "prep failed %w", err) if err != okerr { panic(err) } return } - tracelog.InfoLogger.Printf("ok") + spqrlog.Logger.Printf(spqrlog.INFO, "ok") } func main() { diff --git a/test/stress/stress.go b/test/stress/stress.go index e7d59c656..d2bcd0383 100644 --- a/test/stress/stress.go +++ b/test/stress/stress.go @@ -7,8 +7,7 @@ import ( "sync" "time" - "github.com/wal-g/tracelog" - + "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/jmoiron/sqlx" _ "github.com/lib/pq" "github.com/spf13/cobra" @@ -32,7 +31,7 @@ func getConn(ctx context.Context, dbname string, retryCnt int) (*sqlx.DB, error) for i := 0; i < retryCnt; i++ { db, err := sqlx.ConnectContext(ctx, "postgres", pgConString) if err != nil { - tracelog.ErrorLogger.PrintError(fmt.Errorf("error while connecting to postgresql: %w", err)) + spqrlog.Logger.PrintError(fmt.Errorf("error while connecting to postgresql: %w", err)) continue } return db, nil @@ -51,18 +50,18 @@ func simple() { conn, err := getConn(ctx, dbname, 2) if err != nil { - tracelog.ErrorLogger.PrintError(fmt.Errorf("stress test FAILED %w", err)) + spqrlog.Logger.PrintError(fmt.Errorf("stress test FAILED %w", err)) panic(err) } defer func(conn *sqlx.DB) { err := conn.Close() if err != nil { - tracelog.ErrorLogger.PrintError(err) + spqrlog.Logger.PrintError(err) } }(conn) if _, err := conn.Query(fmt.Sprintf("SELECT * FROM %s WHERE i = %d", relation, 1+r.Intn(10))); err != nil { - tracelog.ErrorLogger.PrintError(err) + spqrlog.Logger.PrintError(err) panic(err) } diff --git a/test/worldmock/worldmock.go b/test/worldmock/worldmock.go index b3bd7d8a9..3e15c49ed 100644 --- a/test/worldmock/worldmock.go +++ b/test/worldmock/worldmock.go @@ -5,15 +5,14 @@ import ( "net" "os" - "github.com/pg-sharding/spqr/router/pkg/datashard" - "github.com/pg-sharding/spqr/router/pkg/route" - "github.com/jackc/pgproto3/v2" - "github.com/wal-g/tracelog" "github.com/pg-sharding/spqr/pkg/config" "github.com/pg-sharding/spqr/pkg/conn" + "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/pg-sharding/spqr/router/pkg/client" + "github.com/pg-sharding/spqr/router/pkg/datashard" + "github.com/pg-sharding/spqr/router/pkg/route" ) type WorldMock struct { @@ -26,7 +25,6 @@ func (w *WorldMock) Run() error { listener, err := net.Listen("tcp", ":6432") if err != nil { - tracelog.ErrorLogger.PrintError(err) return err } defer listener.Close() @@ -55,7 +53,7 @@ func (w *WorldMock) Run() error { go func() { if err := w.serv(c); err != nil { - tracelog.ErrorLogger.PrintError(err) + spqrlog.Logger.PrintError(err) } }() @@ -72,7 +70,7 @@ func (w *WorldMock) serv(netconn net.Conn) error { return err } - tracelog.InfoLogger.Printf("initialized client connection %s-%s\n", cl.Usr(), cl.DB()) + spqrlog.Logger.Printf(spqrlog.INFO, "initialized client connection %s-%s\n", cl.Usr(), cl.DB()) if err := cl.AssignRule(&config.FrontendRule{ AuthRule: &config.AuthCfg{ @@ -87,24 +85,24 @@ func (w *WorldMock) serv(netconn net.Conn) error { if err := cl.Auth(r); err != nil { return err } - tracelog.InfoLogger.Printf("client auth OK") + spqrlog.Logger.Printf(spqrlog.INFO, "client auth OK") for { msg, err := cl.Receive() if err != nil { - tracelog.ErrorLogger.Printf("failed to received msg %w", err) + spqrlog.Logger.Printf(spqrlog.ERROR, "failed to received msg %w", err) return err } - tracelog.InfoLogger.Printf("received msg %v", msg) + spqrlog.Logger.Printf(spqrlog.INFO, "received msg %v", msg) switch v := msg.(type) { case *pgproto3.Parse: - tracelog.InfoLogger.Printf("received prep stmt %v %v", v.Name, v.Query) + spqrlog.Logger.Printf(spqrlog.INFO, "received prep stmt %v %v", v.Name, v.Query) break case *pgproto3.Query: - tracelog.InfoLogger.Printf("received message %v", v.String) + spqrlog.Logger.Printf(spqrlog.INFO, "received message %v", v.String) _ = cl.ReplyDebugNotice("you are receiving the message from the mock world shard") @@ -136,7 +134,7 @@ func (w *WorldMock) serv(netconn net.Conn) error { }() if err != nil { - tracelog.ErrorLogger.PrintError(err) + spqrlog.Logger.PrintError(err) } default: