Skip to content

Commit

Permalink
drop tracelog using
Browse files Browse the repository at this point in the history
  • Loading branch information
Denchick committed Jan 31, 2023
1 parent 02ee9af commit 18e16c2
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 107 deletions.
16 changes: 7 additions & 9 deletions balancer/pkg/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"sync"
"time"

"github.com/wal-g/tracelog"

"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/router/pkg/client"
)
Expand Down Expand Up @@ -107,15 +105,15 @@ func (b *Balancer) Init(installation InstallationInterface, coordinator Coordina
if err == nil {
break
}
//tracelog.ErrorLogger.PrintError(err)
//spqrlog.Logger.PrintError(err)
fmt.Println("Error: trying to init state by coordinator, but got an error ", err)
time.Sleep(b.retryTime)
}
// TODO parallel
for shard, keyRanges := range shardToKeyRanges {
shardDistances, err := b.installation.GetKeyDistanceByRanges(shard, keyRanges)
if err != nil {
tracelog.DebugLogger.PrintError(err)
spqrlog.Logger.PrintError(err)
continue
}
for rng, dist := range shardDistances {
Expand Down Expand Up @@ -187,7 +185,7 @@ func (b *Balancer) tryToUpdateShardStats(shard Shard, wg *sync.WaitGroup) {
_stats, err := b.installation.GetShardStats(shard, keyRanges)

if err != nil {
//tracelog.ErrorLogger.PrintError(err)
//spqrlog.Logger.PrintError(err)
fmt.Println("Error: ", err)
return
}
Expand Down Expand Up @@ -583,7 +581,7 @@ func (b *Balancer) runTask(task *Action) error {
var err error

for task.actionStage != actionStageDone {
tracelog.InfoLogger.Printf("Action stage: %v", task.actionStage)
spqrlog.Logger.Printf(spqrlog.INFO, "Action stage: %v", task.actionStage)

switch task.actionStage {
case actionStagePlan:
Expand Down Expand Up @@ -709,11 +707,11 @@ func (b *Balancer) planTasks() {

shardStats, err := b.installation.GetShardStats(shard, keyRanges)
if err != nil {
tracelog.ErrorLogger.PrintError(err)
spqrlog.Logger.PrintError(err)
return
}

tracelog.InfoLogger.Printf("Plan tasks. ShardStats: %#v", shardStats)
spqrlog.Logger.Printf(spqrlog.INFO, "Plan tasks. ShardStats: %#v", shardStats)

b.bestTask = Task{}
for i := 0; i < b.plannerCount; i++ {
Expand Down Expand Up @@ -807,7 +805,7 @@ func (b *Balancer) BrutForceStrategy() {
reload := b.reloadRequired
b.muReload.Unlock()
if !reload {
tracelog.InfoLogger.Println("Check coordinator for reloading requirements")
spqrlog.Logger.Printf(spqrlog.INFO, "Check coordinator for reloading requirements")
reload, err = b.coordinator.isReloadRequired()
if err != nil {
fmt.Println("Error while spqr.isReloadRequired call: ", err)
Expand Down
6 changes: 3 additions & 3 deletions balancer/pkg/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"time"

_ "github.com/jackc/pgx/v4/stdlib"
"github.com/wal-g/tracelog"
"github.com/pg-sharding/spqr/pkg/spqrlog"
"golang.yandex/hasql"
"golang.yandex/hasql/checkers"
)
Expand Down Expand Up @@ -133,7 +133,7 @@ func NewCluster(addrs []string, dbname, user, password, sslMode, sslRootCert str
nodes := make([]hasql.Node, 0, len(addrs))
for _, addr := range addrs {
connString := ConnString(addr, dbname, user, password, sslMode, sslRootCert)
tracelog.InfoLogger.Printf("Connection string: %v", connString)
spqrlog.Logger.Printf(spqrlog.INFO, "Connection string: %v", connString)

db, err := sql.Open("pgx", connString)
if err != nil {
Expand All @@ -144,7 +144,7 @@ func NewCluster(addrs []string, dbname, user, password, sslMode, sslRootCert str
nodes = append(nodes, hasql.NewNode(addr, db))
}

tracelog.InfoLogger.Printf("Nodes: %v", nodes)
spqrlog.Logger.Printf(spqrlog.INFO, "Nodes: %v", nodes)

return hasql.NewCluster(nodes, checkers.PostgreSQL)
}
Expand Down
14 changes: 4 additions & 10 deletions cmd/logproxy/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package main

import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/test/logproxy"
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
)

var rootCmd = &cobra.Command{
Expand All @@ -18,21 +18,15 @@ var rootCmd = &cobra.Command{

func Execute() {
if err := rootCmd.Execute(); err != nil {
tracelog.ErrorLogger.Fatal(err)
spqrlog.Logger.FatalOnError(err)
}
}

var runCmd = &cobra.Command{
Use: "run",
RunE: func(cmd *cobra.Command, args []string) error {

pr := logproxy.NewProxy()
err := pr.Run()
if err != nil {
tracelog.ErrorLogger.FatalOnError(err)
}

return err
pr := &logproxy.Proxy{}
return pr.Run()
},
}

Expand Down
7 changes: 3 additions & 4 deletions cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"

"github.com/pg-sharding/spqr/pkg/config"
"github.com/pg-sharding/spqr/router/app"
Expand All @@ -27,8 +26,8 @@ var (
)

var rootCmd = &cobra.Command{
Use: "./spqr-router run --config `path-to-config-folder`",
Short: "sqpr-rr",
Use: "spqr-router run --config `path-to-config-folder`",
Short: "sqpr-router",
Long: "spqr-router",
CompletionOptions: cobra.CompletionOptions{
DisableDefaultCmd: true,
Expand All @@ -39,7 +38,7 @@ var rootCmd = &cobra.Command{

func Execute() {
if err := rootCmd.Execute(); err != nil {
tracelog.ErrorLogger.Fatal(err)
spqrlog.Logger.FatalOnError(err)
}
}

Expand Down
12 changes: 3 additions & 9 deletions cmd/worldmock/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package main

import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/test/worldmock"
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
)

var rootCmd = &cobra.Command{
Expand All @@ -18,7 +18,7 @@ var rootCmd = &cobra.Command{

func Execute() {
if err := rootCmd.Execute(); err != nil {
tracelog.ErrorLogger.Fatal(err)
spqrlog.Logger.FatalOnError(err)
}
}

Expand All @@ -28,14 +28,8 @@ var addr string
var runCmd = &cobra.Command{
Use: "run",
RunE: func(cmd *cobra.Command, args []string) error {

w := worldmock.NewWorldMock(addr)
err := w.Run()
if err != nil {
tracelog.ErrorLogger.FatalOnError(err)
}

return err
return w.Run()
},
}

Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/spf13/cobra v1.5.0
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/wal-g/tracelog v0.0.0-20210121175938-897a155375e3
go.etcd.io/etcd/client/v3 v3.5.4
go.uber.org/atomic v1.9.0
golang.yandex/hasql v1.0.0
Expand Down
5 changes: 3 additions & 2 deletions pkg/client/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"sync"

spqrlog "github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/wal-g/tracelog"
)

type Pool interface {
Expand Down Expand Up @@ -48,7 +47,9 @@ func (c *PoolImpl) Shutdown() error {

for _, cl := range c.pool {
go func(cl Client) {
tracelog.InfoLogger.PrintError(cl.Shutdown())
if err := cl.Shutdown(); err != nil {
spqrlog.Logger.PrintError(err)
}
}(cl)
}

Expand Down
5 changes: 2 additions & 3 deletions router/pkg/rulerouter/route_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/pg-sharding/spqr/pkg/client"
"github.com/pg-sharding/spqr/pkg/config"
"github.com/pg-sharding/spqr/router/pkg/route"
"github.com/wal-g/tracelog"
)

type RoutePool interface {
Expand Down Expand Up @@ -44,7 +43,7 @@ func (r *RoutePoolImpl) NotifyRoutes(cb func(route *route.Route) error) error {
for _, rt := range r.pool {
go func(rt *route.Route) {
if err := cb(rt); err != nil {
tracelog.InfoLogger.Printf("error while notifying route %v", err)
spqrlog.Logger.Printf(spqrlog.INFO, "error while notifying route %v", err)
}
}(rt)
}
Expand Down Expand Up @@ -83,7 +82,7 @@ func (r *RoutePoolImpl) MatchRoute(key route.Key,
defer r.mu.Unlock()

if nroute, ok := r.pool[key]; ok {
tracelog.InfoLogger.Printf("match route %v", key)
spqrlog.Logger.Printf(spqrlog.INFO, "match route %v", key)
return nroute, nil
}

Expand Down
3 changes: 1 addition & 2 deletions router/pkg/rulerouter/rulerouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
rclient "github.com/pg-sharding/spqr/router/pkg/client"
"github.com/pg-sharding/spqr/router/pkg/route"
"github.com/pkg/errors"
"github.com/wal-g/tracelog"
)

type RuleRouter interface {
Expand Down Expand Up @@ -48,7 +47,7 @@ type RuleRouterImpl struct {
}

func (r *RuleRouterImpl) AddWorldShard(key qdb.ShardKey) error {
tracelog.InfoLogger.Printf("added world datashard to rrouter %v", key.Name)
spqrlog.Logger.Printf(spqrlog.INFO, "added world datashard to rrouter %v", key.Name)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion router/pkg/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (r *InstanceImpl) initJaegerTracer(rcfg *config.Router) (io.Closer, error)
},
}

jLogger := jaegerlog.StdLogger //TODO: replace with tracelog logger
jLogger := jaegerlog.StdLogger //TODO: replace with spqrlog logger
jMetricsFactory := metrics.NullFactory

// Initialize tracer with a logger and a metrics factory
Expand Down
37 changes: 15 additions & 22 deletions test/logproxy/logproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"os"

"github.com/jackc/pgproto3/v2"
"github.com/pg-sharding/spqr/pkg/spqrlog"
//"github.com/pg-sharding/spqr/pkg/config"
//"github.com/pg-sharding/spqr/router/pkg/client"
"github.com/wal-g/tracelog"
)

const failedToReceiveMessage = "failed to received msg %w"

func getC() (net.Conn, error) {
const proto = "tcp"
const addr = "[::1]:6432"
Expand All @@ -26,7 +28,7 @@ func (p *Proxy) Run() error {

listener, err := net.Listen("tcp6", "[::1]:5433")
if err != nil {
tracelog.ErrorLogger.PrintError(err)
spqrlog.Logger.PrintError(err)
return err
}
defer listener.Close()
Expand Down Expand Up @@ -55,7 +57,7 @@ func (p *Proxy) Run() error {

go func() {
if err := p.serv(c); err != nil {
tracelog.ErrorLogger.PrintError(err)
spqrlog.Logger.PrintError(err)
}
}()

Expand All @@ -67,7 +69,6 @@ func (p *Proxy) serv(netconn net.Conn) error {

conn, err := getC()
if err != nil {
fmt.Printf("failed %w", err)
return err
}

Expand All @@ -80,7 +81,7 @@ func (p *Proxy) serv(netconn net.Conn) error {
// return err
// }

// tracelog.InfoLogger.Printf("initialized client connection %s-%s\n", cl.Usr(), cl.DB())
// spqrlog.Logger.Printf(spqrlog.INFO, "initialized client connection %s-%s\n", cl.Usr(), cl.DB())

// if err := cl.AssignRule(&config.FRRule{
// AuthRule: config.AuthRule{
Expand All @@ -93,23 +94,23 @@ func (p *Proxy) serv(netconn net.Conn) error {
// if err := cl.Auth(); err != nil {
// return err
// }
// tracelog.InfoLogger.Printf("client auth OK")
// spqrlog.Logger.Printf(spqrlog.INFO, "client auth OK")

cb := func(msg pgproto3.FrontendMessage) {
tracelog.InfoLogger.Printf("received msg %v", msg)
spqrlog.Logger.Printf(spqrlog.INFO, "received msg %v", msg)

switch v := msg.(type) {
case *pgproto3.Parse:
tracelog.InfoLogger.Printf("received prep stmt %v %v", v.Name, v.Query)
spqrlog.Logger.Printf(spqrlog.INFO, "received prep stmt %v %v", v.Name, v.Query)
break
case *pgproto3.Query:

tracelog.InfoLogger.Printf("received message %v", v.String)
spqrlog.Logger.Printf(spqrlog.INFO, "received message %v", v.String)
default:
}
}
shouldStop := func(msg pgproto3.BackendMessage) bool {
tracelog.InfoLogger.Printf("received msg %v", msg)
spqrlog.Logger.Printf(spqrlog.INFO, "received msg %v", msg)

switch msg.(type) {
case *pgproto3.ReadyForQuery:
Expand All @@ -125,24 +126,20 @@ func (p *Proxy) serv(netconn net.Conn) error {
cb(msg)

if err != nil {
tracelog.ErrorLogger.Printf("failed to received msg %w", err)
return err
return fmt.Errorf(failedToReceiveMessage, err)
}
if err := frontend.Send(msg); err != nil {
tracelog.ErrorLogger.Printf("failed to received msg %w", err)
return err
return fmt.Errorf(failedToReceiveMessage, err)
}
for {
retmsg, err := frontend.Receive()
if err != nil {
tracelog.ErrorLogger.Printf("failed to received msg %w", err)
return err
return fmt.Errorf(failedToReceiveMessage, err)
}

err = cl.Send(retmsg)
if err != nil {
tracelog.ErrorLogger.Printf("failed to received msg %w", err)
return err
return fmt.Errorf(failedToReceiveMessage, err)
}

if shouldStop(retmsg) {
Expand All @@ -151,7 +148,3 @@ func (p *Proxy) serv(netconn net.Conn) error {
}
}
}

func NewProxy() *Proxy {
return &Proxy{}
}
Loading

0 comments on commit 18e16c2

Please sign in to comment.