Skip to content

Commit

Permalink
Merge pull request #4753 from weaviate/trengrj/logrus-raft
Browse files Browse the repository at this point in the history
RAFT: Replace slog with logrus for logs consistency
  • Loading branch information
moogacs committed Apr 24, 2024
2 parents 049f946 + 5048fb6 commit d03e1b3
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 76 deletions.
28 changes: 1 addition & 27 deletions adapters/handlers/rest/configure_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net"
"net/http"
_ "net/http/pprof"
Expand Down Expand Up @@ -283,7 +282,7 @@ func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *s
DB: nil,
Parser: schema.NewParser(appState.Cluster, vectorIndex.ParseAndValidateConfig, migrator),
AddrResolver: appState.Cluster,
Logger: sLogger(),
Logger: appState.Logger,
LogLevel: logLevel(),
LogJSONFormat: !logTextFormat(),
IsLocalHost: appState.ServerConfig.Config.Cluster.Localhost,
Expand Down Expand Up @@ -648,31 +647,6 @@ func logger() *logrus.Logger {
return logger
}

// sLogger returns an initialized standard logger
// This logger should replace logrus in future
func sLogger() *slog.Logger {
opts := slog.HandlerOptions{}
switch os.Getenv("LOG_LEVEL") {
case "debug":
opts.Level = slog.LevelDebug
case "warn":
opts.Level = slog.LevelWarn
case "error":
opts.Level = slog.LevelError
default:
opts.Level = slog.LevelInfo
}

var handler slog.Handler
if logTextFormat() {
handler = slog.NewTextHandler(os.Stderr, &opts)
} else {
handler = slog.NewJSONHandler(os.Stderr, &opts)
}

return slog.New(handler)
}

func logLevel() string {
switch level := os.Getenv("LOG_LEVEL"); level {
case "trace", "debug", "warn", "error":
Expand Down
4 changes: 2 additions & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ package cluster
import (
"context"
"fmt"
"log/slog"
"time"

"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/cluster/store"
"github.com/weaviate/weaviate/cluster/transport"
)
Expand All @@ -30,7 +30,7 @@ type Service struct {

client *transport.Client
rpcService *transport.Service
logger *slog.Logger
logger *logrus.Logger
}

func New(cfg store.Config) *Service {
Expand Down
4 changes: 2 additions & 2 deletions cluster/store/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ package store
import (
"context"
"fmt"
"log/slog"
"math/rand"
"time"

"github.com/sirupsen/logrus"
cmd "github.com/weaviate/weaviate/cluster/proto/api"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -53,7 +53,7 @@ func NewBootstrapper(joiner joiner, raftID, raftAddr string, r addressResolver)
}

// Do iterates over a list of servers in an attempt to join this node to a cluster.
func (b *Bootstrapper) Do(ctx context.Context, serverPortMap map[string]int, lg *slog.Logger, voter bool) error {
func (b *Bootstrapper) Do(ctx context.Context, serverPortMap map[string]int, lg *logrus.Logger, voter bool) error {
ticker := time.NewTicker(jitter(b.retryPeriod, b.jitter))
servers := make([]string, 0, len(serverPortMap))
defer ticker.Stop()
Expand Down
2 changes: 1 addition & 1 deletion cluster/store/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestBootStrapper(t *testing.T) {
b.jitter = time.Millisecond
test.doBefore(m)
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
err := b.Do(ctx, test.servers, NewMockSLog(t).Logger, test.voter)
err := b.Do(ctx, test.servers, NewMockLogger(t).Logger, test.voter)
cancel()
if test.success && err != nil {
t.Errorf("%s: %v", test.name, err)
Expand Down
4 changes: 2 additions & 2 deletions cluster/store/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"

"github.com/sirupsen/logrus"
command "github.com/weaviate/weaviate/cluster/proto/api"
gproto "google.golang.org/protobuf/proto"
)
Expand All @@ -32,7 +32,7 @@ type localDB struct {
Schema *schema
store Indexer
parser Parser
log *slog.Logger
log *logrus.Logger
}

func (db *localDB) SetIndexer(idx Indexer) {
Expand Down
179 changes: 179 additions & 0 deletions cluster/store/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: hello@weaviate.io
//

package store

import (
"fmt"
"io"
"log"
"os"

"github.com/hashicorp/go-hclog"
"github.com/sirupsen/logrus"
)

func NewHCLogrusLogger(name string, logger *logrus.Logger) hclog.Logger {
return &hclogLogrus{
entry: logrus.NewEntry(logger),
name: fmt.Sprintf("%s ", name),
}
}

type hclogLogrus struct {
entry *logrus.Entry
name string
}

func (hclogger *hclogLogrus) GetLevel() hclog.Level {
switch hclogger.entry.Logger.Level {
case logrus.TraceLevel:
return hclog.Trace
case logrus.DebugLevel:
return hclog.Debug
case logrus.InfoLevel:
return hclog.Info
case logrus.WarnLevel:
return hclog.Warn
case logrus.ErrorLevel:
case logrus.FatalLevel:
case logrus.PanicLevel:
return hclog.Error
}
return hclog.DefaultLevel
}

func (hclogger *hclogLogrus) Log(level hclog.Level, msg string, args ...interface{}) {
switch level {
case hclog.Trace:
hclogger.Trace(msg, args...)
case hclog.Debug:
hclogger.Debug(msg, args...)
case hclog.Info:
case hclog.NoLevel:
hclogger.Info(msg, args...)
case hclog.Warn:
hclogger.Warn(msg, args...)
case hclog.Error:
hclogger.Error(msg, args...)
case hclog.Off:
}
}

func (hclogger *hclogLogrus) ImpliedArgs() []interface{} {
var fields []interface{}
for k, v := range hclogger.entry.Data {
fields = append(fields, k)
fields = append(fields, v)
}
return fields
}

func (hclogger *hclogLogrus) Name() string {
return hclogger.name
}

func (hclogger *hclogLogrus) Trace(msg string, args ...interface{}) {
hclogger.logToLogrus(logrus.TraceLevel, msg, args...)
}

func (hclogger *hclogLogrus) Debug(msg string, args ...interface{}) {
hclogger.logToLogrus(logrus.DebugLevel, msg, args...)
}

func (hclogger *hclogLogrus) Info(msg string, args ...interface{}) {
hclogger.logToLogrus(logrus.InfoLevel, msg, args...)
}

func (hclogger *hclogLogrus) Warn(msg string, args ...interface{}) {
hclogger.logToLogrus(logrus.WarnLevel, msg, args...)
}

func (hclogger *hclogLogrus) Error(msg string, args ...interface{}) {
hclogger.logToLogrus(logrus.ErrorLevel, msg, args...)
}

func (hclogger *hclogLogrus) logToLogrus(level logrus.Level, msg string, args ...interface{}) {
logger := hclogger.entry
if len(args) > 0 {
logger = hclogger.LoggerWith(args)
}
logger.Log(level, hclogger.name+msg)
}

func (hclogger *hclogLogrus) IsTrace() bool {
return hclogger.entry.Logger.IsLevelEnabled(logrus.TraceLevel)
}

func (hclogger *hclogLogrus) IsDebug() bool {
return hclogger.entry.Logger.IsLevelEnabled(logrus.DebugLevel)
}

func (hclogger *hclogLogrus) IsInfo() bool {
return hclogger.entry.Logger.IsLevelEnabled(logrus.InfoLevel)
}

func (hclogger *hclogLogrus) IsWarn() bool {
return hclogger.entry.Logger.IsLevelEnabled(logrus.WarnLevel)
}

func (hclogger *hclogLogrus) IsError() bool {
return hclogger.entry.Logger.IsLevelEnabled(logrus.ErrorLevel)
}

func (hclogger *hclogLogrus) With(args ...interface{}) hclog.Logger {
return &hclogLogrus{
entry: hclogger.LoggerWith(args),
}
}

func (hclogger *hclogLogrus) LoggerWith(args []interface{}) *logrus.Entry {
l := hclogger.entry
ml := len(args)
var key string
for i := 0; i < ml-1; i += 2 {
keyVal := args[i]
if keyStr, ok := keyVal.(string); ok {
key = keyStr
} else {
key = fmt.Sprintf("%v", keyVal)
}
val := args[i+1]
if f, ok := val.(hclog.Format); ok {
val = fmt.Sprintf(f[0].(string), f[1:])
}
l = l.WithField(key, val)
}
return l
}

func (hclogger *hclogLogrus) Named(name string) hclog.Logger {
return hclogger.ResetNamed(name + hclogger.name)
}

func (hclogger *hclogLogrus) ResetNamed(name string) hclog.Logger {
return &hclogLogrus{
name: name,
entry: hclogger.entry,
}
}

func (hclogger *hclogLogrus) SetLevel(l hclog.Level) {
hclogger.entry.Level = logrus.Level(l)
}

func (hclogger *hclogLogrus) StandardLogger(*hclog.StandardLoggerOptions) *log.Logger {
return log.Default()
}

func (hclogger *hclogLogrus) StandardWriter(*hclog.StandardLoggerOptions) io.Writer {
return os.Stdout
}
4 changes: 2 additions & 2 deletions cluster/store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"github.com/sirupsen/logrus"
cmd "github.com/weaviate/weaviate/cluster/proto/api"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/usecases/sharding"
Expand All @@ -29,7 +29,7 @@ import (
type Service struct {
store *Store
cl client
log *slog.Logger
log *logrus.Logger
}

// client to communicate with remote services
Expand Down
18 changes: 6 additions & 12 deletions cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"net"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
raftbolt "github.com/hashicorp/raft-boltdb/v2"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/cluster/proto/api"
"github.com/weaviate/weaviate/entities/models"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -117,7 +116,7 @@ type Config struct {
DB Indexer
Parser Parser
AddrResolver addressResolver
Logger *slog.Logger
Logger *logrus.Logger
LogLevel string
LogJSONFormat bool
Voter bool
Expand Down Expand Up @@ -153,7 +152,7 @@ type Store struct {
nodeID string
host string
db *localDB
log *slog.Logger
log *logrus.Logger
logLevel string
logJsonFormat bool

Expand Down Expand Up @@ -289,8 +288,7 @@ func (st *Store) init() (logCache *raft.LogCache, err error) {

st.transport, err = st.addResolver.NewTCPTransport(
address, tcpAddr,
tcpMaxPool, tcpTimeout,
st.logLevel, st.logJsonFormat)
tcpMaxPool, tcpTimeout, st.log)
if err != nil {
return nil, fmt.Errorf("transport address=%v tcpAddress=%v maxPool=%v timeOut=%v: %w",
address, tcpAddr, tcpMaxPool, tcpTimeout, err)
Expand Down Expand Up @@ -776,12 +774,8 @@ func (st *Store) raftConfig() *raft.Config {

cfg.LocalID = raft.ServerID(st.nodeID)
cfg.LogLevel = st.logLevel
logger := hclog.New(&hclog.LoggerOptions{
Name: "raft",
Level: hclog.LevelFromString(st.logLevel),
JSONFormat: st.logJsonFormat,
Output: os.Stderr,
})

logger := NewHCLogrusLogger("raft", st.log)
cfg.Logger = logger

return cfg
Expand Down
Loading

0 comments on commit d03e1b3

Please sign in to comment.