Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop wal-g/tracelog usage #132

Merged
merged 2 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions balancer/pkg/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"sync"
"time"

"github.com/wal-g/tracelog"

"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/router/pkg/client"
)
Expand Down Expand Up @@ -107,15 +105,15 @@ func (b *Balancer) Init(installation InstallationInterface, coordinator Coordina
if err == nil {
break
}
//tracelog.ErrorLogger.PrintError(err)
//spqrlog.Logger.PrintError(err)
fmt.Println("Error: trying to init state by coordinator, but got an error ", err)
time.Sleep(b.retryTime)
}
// TODO parallel
for shard, keyRanges := range shardToKeyRanges {
shardDistances, err := b.installation.GetKeyDistanceByRanges(shard, keyRanges)
if err != nil {
tracelog.DebugLogger.PrintError(err)
spqrlog.Logger.PrintError(err)
continue
}
for rng, dist := range shardDistances {
Expand Down Expand Up @@ -187,7 +185,7 @@ func (b *Balancer) tryToUpdateShardStats(shard Shard, wg *sync.WaitGroup) {
_stats, err := b.installation.GetShardStats(shard, keyRanges)

if err != nil {
//tracelog.ErrorLogger.PrintError(err)
//spqrlog.Logger.PrintError(err)
fmt.Println("Error: ", err)
return
}
Expand Down Expand Up @@ -583,7 +581,7 @@ func (b *Balancer) runTask(task *Action) error {
var err error

for task.actionStage != actionStageDone {
tracelog.InfoLogger.Printf("Action stage: %v", task.actionStage)
spqrlog.Logger.Printf(spqrlog.INFO, "Action stage: %v", task.actionStage)

switch task.actionStage {
case actionStagePlan:
Expand Down Expand Up @@ -709,11 +707,11 @@ func (b *Balancer) planTasks() {

shardStats, err := b.installation.GetShardStats(shard, keyRanges)
if err != nil {
tracelog.ErrorLogger.PrintError(err)
spqrlog.Logger.PrintError(err)
return
}

tracelog.InfoLogger.Printf("Plan tasks. ShardStats: %#v", shardStats)
spqrlog.Logger.Printf(spqrlog.INFO, "Plan tasks. ShardStats: %#v", shardStats)

b.bestTask = Task{}
for i := 0; i < b.plannerCount; i++ {
Expand Down Expand Up @@ -807,7 +805,7 @@ func (b *Balancer) BrutForceStrategy() {
reload := b.reloadRequired
b.muReload.Unlock()
if !reload {
tracelog.InfoLogger.Println("Check coordinator for reloading requirements")
spqrlog.Logger.Printf(spqrlog.INFO, "Check coordinator for reloading requirements")
reload, err = b.coordinator.isReloadRequired()
if err != nil {
fmt.Println("Error while spqr.isReloadRequired call: ", err)
Expand Down
6 changes: 3 additions & 3 deletions balancer/pkg/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"time"

_ "github.com/jackc/pgx/v4/stdlib"
"github.com/wal-g/tracelog"
"github.com/pg-sharding/spqr/pkg/spqrlog"
"golang.yandex/hasql"
"golang.yandex/hasql/checkers"
)
Expand Down Expand Up @@ -133,7 +133,7 @@ func NewCluster(addrs []string, dbname, user, password, sslMode, sslRootCert str
nodes := make([]hasql.Node, 0, len(addrs))
for _, addr := range addrs {
connString := ConnString(addr, dbname, user, password, sslMode, sslRootCert)
tracelog.InfoLogger.Printf("Connection string: %v", connString)
spqrlog.Logger.Printf(spqrlog.INFO, "Connection string: %v", connString)

db, err := sql.Open("pgx", connString)
if err != nil {
Expand All @@ -144,7 +144,7 @@ func NewCluster(addrs []string, dbname, user, password, sslMode, sslRootCert str
nodes = append(nodes, hasql.NewNode(addr, db))
}

tracelog.InfoLogger.Printf("Nodes: %v", nodes)
spqrlog.Logger.Printf(spqrlog.INFO, "Nodes: %v", nodes)

return hasql.NewCluster(nodes, checkers.PostgreSQL)
}
Expand Down
14 changes: 4 additions & 10 deletions cmd/logproxy/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package main

import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/test/logproxy"
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
)

var rootCmd = &cobra.Command{
Expand All @@ -18,21 +18,15 @@ var rootCmd = &cobra.Command{

func Execute() {
if err := rootCmd.Execute(); err != nil {
tracelog.ErrorLogger.Fatal(err)
spqrlog.Logger.FatalOnError(err)
}
}

var runCmd = &cobra.Command{
Use: "run",
RunE: func(cmd *cobra.Command, args []string) error {

pr := logproxy.NewProxy()
err := pr.Run()
if err != nil {
tracelog.ErrorLogger.FatalOnError(err)
}

return err
pr := &logproxy.Proxy{}
return pr.Run()
},
}

Expand Down
7 changes: 3 additions & 4 deletions cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"

"github.com/pg-sharding/spqr/pkg/config"
"github.com/pg-sharding/spqr/router/app"
Expand All @@ -27,8 +26,8 @@ var (
)

var rootCmd = &cobra.Command{
Use: "./spqr-router run --config `path-to-config-folder`",
Short: "sqpr-rr",
Use: "spqr-router run --config `path-to-config-folder`",
Short: "sqpr-router",
Long: "spqr-router",
CompletionOptions: cobra.CompletionOptions{
DisableDefaultCmd: true,
Expand All @@ -39,7 +38,7 @@ var rootCmd = &cobra.Command{

func Execute() {
if err := rootCmd.Execute(); err != nil {
tracelog.ErrorLogger.Fatal(err)
spqrlog.Logger.FatalOnError(err)
}
}

Expand Down
12 changes: 3 additions & 9 deletions cmd/worldmock/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package main

import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/test/worldmock"
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
)

var rootCmd = &cobra.Command{
Expand All @@ -18,7 +18,7 @@ var rootCmd = &cobra.Command{

func Execute() {
if err := rootCmd.Execute(); err != nil {
tracelog.ErrorLogger.Fatal(err)
spqrlog.Logger.FatalOnError(err)
}
}

Expand All @@ -28,14 +28,8 @@ var addr string
var runCmd = &cobra.Command{
Use: "run",
RunE: func(cmd *cobra.Command, args []string) error {

w := worldmock.NewWorldMock(addr)
err := w.Run()
if err != nil {
tracelog.ErrorLogger.FatalOnError(err)
}

return err
return w.Run()
},
}

Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/spf13/cobra v1.5.0
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/wal-g/tracelog v0.0.0-20210121175938-897a155375e3
go.etcd.io/etcd/client/v3 v3.5.4
go.uber.org/atomic v1.9.0
golang.yandex/hasql v1.0.0
Expand Down
5 changes: 3 additions & 2 deletions pkg/client/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"sync"

spqrlog "github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/wal-g/tracelog"
)

type Pool interface {
Expand Down Expand Up @@ -48,7 +47,9 @@ func (c *PoolImpl) Shutdown() error {

for _, cl := range c.pool {
go func(cl Client) {
tracelog.InfoLogger.PrintError(cl.Shutdown())
if err := cl.Shutdown(); err != nil {
spqrlog.Logger.PrintError(err)
}
}(cl)
}

Expand Down
5 changes: 2 additions & 3 deletions router/pkg/rulerouter/route_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/pg-sharding/spqr/pkg/client"
"github.com/pg-sharding/spqr/pkg/config"
"github.com/pg-sharding/spqr/router/pkg/route"
"github.com/wal-g/tracelog"
)

type RoutePool interface {
Expand Down Expand Up @@ -44,7 +43,7 @@ func (r *RoutePoolImpl) NotifyRoutes(cb func(route *route.Route) error) error {
for _, rt := range r.pool {
go func(rt *route.Route) {
if err := cb(rt); err != nil {
tracelog.InfoLogger.Printf("error while notifying route %v", err)
spqrlog.Logger.Printf(spqrlog.INFO, "error while notifying route %v", err)
}
}(rt)
}
Expand Down Expand Up @@ -83,7 +82,7 @@ func (r *RoutePoolImpl) MatchRoute(key route.Key,
defer r.mu.Unlock()

if nroute, ok := r.pool[key]; ok {
tracelog.InfoLogger.Printf("match route %v", key)
spqrlog.Logger.Printf(spqrlog.INFO, "match route %v", key)
return nroute, nil
}

Expand Down
3 changes: 1 addition & 2 deletions router/pkg/rulerouter/rulerouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
rclient "github.com/pg-sharding/spqr/router/pkg/client"
"github.com/pg-sharding/spqr/router/pkg/route"
"github.com/pkg/errors"
"github.com/wal-g/tracelog"
)

type RuleRouter interface {
Expand Down Expand Up @@ -48,7 +47,7 @@ type RuleRouterImpl struct {
}

func (r *RuleRouterImpl) AddWorldShard(key qdb.ShardKey) error {
tracelog.InfoLogger.Printf("added world datashard to rrouter %v", key.Name)
spqrlog.Logger.Printf(spqrlog.INFO, "added world datashard to rrouter %v", key.Name)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion router/pkg/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (r *InstanceImpl) initJaegerTracer(rcfg *config.Router) (io.Closer, error)
},
}

jLogger := jaegerlog.StdLogger //TODO: replace with tracelog logger
jLogger := jaegerlog.StdLogger //TODO: replace with spqrlog logger
jMetricsFactory := metrics.NullFactory

// Initialize tracer with a logger and a metrics factory
Expand Down
37 changes: 15 additions & 22 deletions test/logproxy/logproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"os"

"github.com/jackc/pgproto3/v2"
"github.com/pg-sharding/spqr/pkg/spqrlog"
//"github.com/pg-sharding/spqr/pkg/config"
//"github.com/pg-sharding/spqr/router/pkg/client"
"github.com/wal-g/tracelog"
)

const failedToReceiveMessage = "failed to received msg %w"

func getC() (net.Conn, error) {
const proto = "tcp"
const addr = "[::1]:6432"
Expand All @@ -26,7 +28,7 @@ func (p *Proxy) Run() error {

listener, err := net.Listen("tcp6", "[::1]:5433")
if err != nil {
tracelog.ErrorLogger.PrintError(err)
spqrlog.Logger.PrintError(err)
return err
}
defer listener.Close()
Expand Down Expand Up @@ -55,7 +57,7 @@ func (p *Proxy) Run() error {

go func() {
if err := p.serv(c); err != nil {
tracelog.ErrorLogger.PrintError(err)
spqrlog.Logger.PrintError(err)
}
}()

Expand All @@ -67,7 +69,6 @@ func (p *Proxy) serv(netconn net.Conn) error {

conn, err := getC()
if err != nil {
fmt.Printf("failed %w", err)
return err
}

Expand All @@ -80,7 +81,7 @@ func (p *Proxy) serv(netconn net.Conn) error {
// return err
// }

// tracelog.InfoLogger.Printf("initialized client connection %s-%s\n", cl.Usr(), cl.DB())
// spqrlog.Logger.Printf(spqrlog.INFO, "initialized client connection %s-%s\n", cl.Usr(), cl.DB())

// if err := cl.AssignRule(&config.FRRule{
// AuthRule: config.AuthRule{
Expand All @@ -93,23 +94,23 @@ func (p *Proxy) serv(netconn net.Conn) error {
// if err := cl.Auth(); err != nil {
// return err
// }
// tracelog.InfoLogger.Printf("client auth OK")
// spqrlog.Logger.Printf(spqrlog.INFO, "client auth OK")

cb := func(msg pgproto3.FrontendMessage) {
tracelog.InfoLogger.Printf("received msg %v", msg)
spqrlog.Logger.Printf(spqrlog.INFO, "received msg %v", msg)

switch v := msg.(type) {
case *pgproto3.Parse:
tracelog.InfoLogger.Printf("received prep stmt %v %v", v.Name, v.Query)
spqrlog.Logger.Printf(spqrlog.INFO, "received prep stmt %v %v", v.Name, v.Query)
break
case *pgproto3.Query:

tracelog.InfoLogger.Printf("received message %v", v.String)
spqrlog.Logger.Printf(spqrlog.INFO, "received message %v", v.String)
default:
}
}
shouldStop := func(msg pgproto3.BackendMessage) bool {
tracelog.InfoLogger.Printf("received msg %v", msg)
spqrlog.Logger.Printf(spqrlog.INFO, "received msg %v", msg)

switch msg.(type) {
case *pgproto3.ReadyForQuery:
Expand All @@ -125,24 +126,20 @@ func (p *Proxy) serv(netconn net.Conn) error {
cb(msg)

if err != nil {
tracelog.ErrorLogger.Printf("failed to received msg %w", err)
return err
return fmt.Errorf(failedToReceiveMessage, err)
}
if err := frontend.Send(msg); err != nil {
tracelog.ErrorLogger.Printf("failed to received msg %w", err)
return err
return fmt.Errorf(failedToReceiveMessage, err)
}
for {
retmsg, err := frontend.Receive()
if err != nil {
tracelog.ErrorLogger.Printf("failed to received msg %w", err)
return err
return fmt.Errorf(failedToReceiveMessage, err)
}

err = cl.Send(retmsg)
if err != nil {
tracelog.ErrorLogger.Printf("failed to received msg %w", err)
return err
return fmt.Errorf(failedToReceiveMessage, err)
}

if shouldStop(retmsg) {
Expand All @@ -151,7 +148,3 @@ func (p *Proxy) serv(netconn net.Conn) error {
}
}
}

func NewProxy() *Proxy {
return &Proxy{}
}
Loading