Skip to content

Commit

Permalink
added zap logger to NewEnvironment to be used in scytale
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed May 3, 2023
1 parent e4844ac commit 983f6ba
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 222 deletions.
21 changes: 21 additions & 0 deletions adapter/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package adapter

import "go.uber.org/zap"

type Adapter struct {
*zap.Logger
}

// this method makes Adapter implement log.Logger
func (a Adapter) Log(keyvals ...interface{}) error {
fields := make([]zap.Field, 0, len(keyvals)/2)
for i, j := 0, 0; j < len(keyvals); i, j = i+1, j+1 {
fields = append(fields, zap.Any(keyvals[i].(string), keyvals[j]))
}

// ignore the case where there's an odd number of keyvals ... that would be a bug
// and we're deprecating webpa-common anyway

a.Logger.Info("", fields...)
return nil
}
26 changes: 15 additions & 11 deletions service/consul/datacenterWatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@ import (
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/mitchellh/mapstructure"
"github.com/xmidt-org/argus/chrysom"
"github.com/xmidt-org/argus/model"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/webpa-common/v2/service"
"go.uber.org/zap"
)

// datacenterWatcher checks if datacenters have been updated, based on an interval.
type datacenterWatcher struct {
logger log.Logger
logger *zap.Logger
environment Environment
options Options
inactiveDatacenters map[string]bool
Expand All @@ -36,10 +35,10 @@ var (
defaultWatchInterval = 5 * time.Minute
)

func newDatacenterWatcher(logger log.Logger, environment Environment, options Options) (*datacenterWatcher, error) {
func newDatacenterWatcher(logger *zap.Logger, environment Environment, options Options) (*datacenterWatcher, error) {

if logger == nil {
logger = log.NewNopLogger()
logger = sallust.Default()
}

if options.DatacenterWatchInterval <= 0 {
Expand Down Expand Up @@ -86,13 +85,13 @@ func newDatacenterWatcher(logger log.Logger, environment Environment, options Op
//create chrysom client and start it
datacenterWatcher.stopListener = listener.Stop
listener.Start(context.Background())
logger.Log(level.Key(), level.DebugValue(), "msg", "started chrysom, argus client")
logger.Debug("started chrysom, argus client")
}

//start consul watch
ticker := time.NewTicker(datacenterWatcher.consulWatchInterval)
go datacenterWatcher.watchDatacenters(ticker)
logger.Log(level.Key(), level.DebugValue(), "msg", "started consul datacenter watch")
logger.Debug("started consul datacenter watch")
return datacenterWatcher, nil

}
Expand Down Expand Up @@ -144,7 +143,7 @@ func (d *datacenterWatcher) updateInstancers(datacenters []string) {

}

func updateInactiveDatacenters(items []model.Item, inactiveDatacenters map[string]bool, lock *sync.RWMutex, logger log.Logger) {
func updateInactiveDatacenters(items []model.Item, inactiveDatacenters map[string]bool, lock *sync.RWMutex, logger *zap.Logger) {
chrysomMap := make(map[string]bool)
for _, item := range items {

Expand All @@ -154,7 +153,7 @@ func updateInactiveDatacenters(items []model.Item, inactiveDatacenters map[strin
err := mapstructure.Decode(item.Data, &df)

if err != nil {
logger.Log(level.Key(), level.ErrorValue(), "msg", "failed to decode database results into datacenter filter struct")
logger.Error("failed to decode database results into datacenter filter struct")
continue
}

Expand Down Expand Up @@ -186,7 +185,8 @@ func createNewInstancer(keys map[string]bool, instancersToAdd service.Instancers
dw.lock.RUnlock()

if found {
dw.logger.Log(level.Key(), level.InfoValue(), "msg", "datacenter set as inactive", "datacenter name: ", datacenter)
field := zap.Any("datacenter name: ", datacenter)
dw.logger.Info("datacenter set as inactive", field)
return
}

Expand All @@ -203,7 +203,11 @@ func createNewInstancer(keys map[string]bool, instancersToAdd service.Instancers

// don't create new instancer if it was already created and added to the new instancers map
if instancersToAdd.Has(key) {
dw.logger.Log(level.Key(), level.WarnValue(), "msg", "skipping duplicate watch", "service", w.Service, "tags", w.Tags, "passingOnly", w.PassingOnly, "datacenter", w.QueryOptions.Datacenter)
s := zap.String("service", w.Service)
t := zap.Any("tags", w.Tags)
p := zap.Bool("passingOnly", w.PassingOnly)
d := zap.String("datacenter", w.QueryOptions.Datacenter)
dw.logger.Warn("skipping duplicate watch", s, t, p, d)
return
}

Expand Down
11 changes: 6 additions & 5 deletions service/consul/datacenterWatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/xmidt-org/argus/chrysom"
"github.com/xmidt-org/argus/model"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/webpa-common/v2/service"
"github.com/xmidt-org/webpa-common/v2/xmetrics"
"go.uber.org/zap"
)

func TestNewDatacenterWatcher(t *testing.T) {
logger := log.NewNopLogger()
logger := sallust.Default()
r, err := xmetrics.NewRegistry(nil, Metrics)
require.Nil(t, err)
envShutdownChan := make(<-chan struct{})
Expand Down Expand Up @@ -46,7 +47,7 @@ func TestNewDatacenterWatcher(t *testing.T) {

tests := []struct {
description string
logger log.Logger
logger *zap.Logger
environment Environment
options Options
ctx context.Context
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestNewDatacenterWatcher(t *testing.T) {
DatacenterWatchInterval: 10 * time.Second,
},
expectedWatcher: &datacenterWatcher{
logger: log.NewNopLogger(),
logger: logger,
environment: environment{
mockServiceEnvironment, new(mockClient),
},
Expand Down Expand Up @@ -247,7 +248,7 @@ func TestNewDatacenterWatcher(t *testing.T) {
}

func TestUpdateInactiveDatacenters(t *testing.T) {
logger := log.NewNopLogger()
logger := sallust.Default()

tests := []struct {
description string
Expand Down
58 changes: 22 additions & 36 deletions service/consul/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"encoding/base64"
"errors"
"fmt"
"os"
"time"

"github.com/go-kit/kit/sd"
gokitconsul "github.com/go-kit/kit/sd/consul"
"github.com/go-kit/kit/util/conn"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/consul/api"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/webpa-common/v2/adapter"
"github.com/xmidt-org/webpa-common/v2/service"
"go.uber.org/zap"
)

var (
Expand All @@ -26,11 +26,9 @@ var (
// in direct API calls.
type Environment interface {
service.Environment

// Client returns the custom consul Client interface exposed by this package
Client() Client
}

type environment struct {
service.Environment
client Client
Expand All @@ -39,34 +37,28 @@ type environment struct {
func (e environment) Client() Client {
return e.client
}

func generateID() string {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
// TODO: When does this ever happen?
panic(err)
}

return base64.RawURLEncoding.EncodeToString(b)
}

func ensureIDs(r *api.AgentServiceRegistration) {
if len(r.ID) == 0 {
r.ID = generateID()
}

if r.Check != nil && len(r.Check.CheckID) == 0 {
r.Check.CheckID = generateID()
}

for _, check := range r.Checks {
if len(check.CheckID) == 0 {
check.CheckID = generateID()
}
}
}

func newInstancerKey(w Watch) string {
return fmt.Sprintf(
"%s%s{passingOnly=%t}{datacenter=%s}",
Expand All @@ -76,38 +68,39 @@ func newInstancerKey(w Watch) string {
w.QueryOptions.Datacenter,
)
}

func defaultClientFactory(client *api.Client) (Client, ttlUpdater) {
return NewClient(client), client.Agent()
}

var clientFactory = defaultClientFactory
var logger = adapter.Adapter{
Logger: sallust.Default(),
}

func getDatacenters(l log.Logger, c Client, co Options) ([]string, error) {
func getDatacenters(l *zap.Logger, c Client, co Options) ([]string, error) {
datacenters, err := c.Datacenters()
if err == nil {
return datacenters, nil
}

l.Log(level.Key(), level.ErrorValue(), "msg", "Could not acquire datacenters on initial attempt", "error", err)
l.Error("Could not acquire datacenters on initial attempt", zap.Error(err))

d := 30 * time.Millisecond
for retry := 0; retry < co.datacenterRetries(); retry++ {
time.Sleep(d)
d = conn.Exponential(d)

datacenters, err = c.Datacenters()
if err == nil {
return datacenters, nil
}

l.Log(level.Key(), level.ErrorValue(), "retryCount", retry, "msg", "Could not acquire datacenters", "error", err)
l.Error("Could not acquire datacenters", zap.Int("retryCount", retry), zap.Error(err))
}

return nil, errNoDatacenters
}

func newInstancer(l log.Logger, c Client, w Watch) sd.Instancer {
func newInstancer(l *zap.Logger, c Client, w Watch) sd.Instancer {
return service.NewContextualInstancer(
NewInstancer(InstancerOptions{
Client: c,
Expand All @@ -126,7 +119,7 @@ func newInstancer(l log.Logger, c Client, w Watch) sd.Instancer {
)
}

func newInstancers(l log.Logger, c Client, co Options) (i service.Instancers, err error) {
func newInstancers(l *zap.Logger, c Client, co Options) (i service.Instancers, err error) {
var datacenters []string
for _, w := range co.watches() {
if w.CrossDatacenter {
Expand All @@ -136,78 +129,72 @@ func newInstancers(l log.Logger, c Client, co Options) (i service.Instancers, er
return
}
}

for _, datacenter := range datacenters {
w.QueryOptions.Datacenter = datacenter
key := newInstancerKey(w)
if i.Has(key) {
l.Log(level.Key(), level.WarnValue(), "msg", "skipping duplicate watch", "service", w.Service, "tags", w.Tags, "passingOnly", w.PassingOnly, "datacenter", w.QueryOptions.Datacenter)
l.Warn("skipping duplicate watch", zap.String("service", w.Service), zap.Strings("tags", w.Tags), zap.Bool("passingOnly", w.PassingOnly), zap.String("datacenter", w.QueryOptions.Datacenter))
continue
}
i.Set(key, newInstancer(l, c, w))
}
} else {
key := newInstancerKey(w)
if i.Has(key) {
l.Log(level.Key(), level.WarnValue(), "msg", "skipping duplicate watch", "service", w.Service, "tags", w.Tags, "passingOnly", w.PassingOnly, "datacenter", w.QueryOptions.Datacenter)
l.Warn("skipping duplicate watch", zap.String("service", w.Service), zap.Strings("tags", w.Tags), zap.Bool("passingOnly", w.PassingOnly), zap.String("datacenter", w.QueryOptions.Datacenter))
continue
}
i.Set(key, newInstancer(l, c, w))
}
}

return
}

func newRegistrars(l log.Logger, registrationScheme string, c gokitconsul.Client, u ttlUpdater, co Options) (r service.Registrars, closer func() error, err error) {
func newRegistrars(l *zap.Logger, registrationScheme string, c gokitconsul.Client, u ttlUpdater, co Options) (r service.Registrars, closer func() error, err error) {
var consulRegistrar sd.Registrar
for _, registration := range co.registrations() {
instance := service.FormatInstance(registrationScheme, registration.Address, registration.Port)
if r.Has(instance) {
l.Log(level.Key(), level.WarnValue(), "msg", "skipping duplicate registration", "instance", instance)
l.Warn("skipping duplicate registration", zap.String("instance", instance))
continue
}

if !co.disableGenerateID() {
ensureIDs(&registration)
}

consulRegistrar, err = NewRegistrar(c, u, &registration, log.With(l, "id", registration.ID, "instance", instance))
rid := zap.String("id", registration.ID)
in := zap.String("instance", instance)
logger.Logger.With(rid, in)
consulRegistrar, err = NewRegistrar(c, u, &registration, logger)
if err != nil {
return
}

r.Add(instance, consulRegistrar)
}

return
}

func NewEnvironment(l log.Logger, registrationScheme string, co Options, eo ...service.Option) (service.Environment, error) {
func NewEnvironment(l *zap.Logger, registrationScheme string, co Options, eo ...service.Option) (service.Environment, error) {
if l == nil {
l = log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
l = sallust.Default()
}

if len(co.Watches) == 0 && len(co.Registrations) == 0 {
return nil, service.ErrIncomplete
}

consulClient, err := api.NewClient(co.config())
if err != nil {
return nil, err
}

client, updater := clientFactory(consulClient)
r, closer, err := newRegistrars(l, registrationScheme, client, updater, co)
if err != nil {
return nil, err
}

i, err := newInstancers(l, client, co)
if err != nil {
return nil, err
}

newServiceEnvironment := environment{
service.NewEnvironment(
append(
Expand All @@ -216,11 +203,10 @@ func NewEnvironment(l log.Logger, registrationScheme string, co Options, eo ...s
service.WithInstancers(i),
service.WithCloser(closer),
)...), NewClient(consulClient)}

if co.DatacenterWatchInterval > 0 || (len(co.Chrysom.Bucket) > 0 && co.Chrysom.Listen.PullInterval > 0) {
_, err := newDatacenterWatcher(l, newServiceEnvironment, co)
if err != nil {
l.Log(level.Key(), level.ErrorValue(), "msg", "Could not create datacenter watcher", "error", err)
l.Error("Could not create datacenter watcher", zap.Error(err))
}
}

Expand Down

0 comments on commit 983f6ba

Please sign in to comment.