Skip to content
This repository has been archived by the owner on Aug 18, 2023. It is now read-only.

Commit

Permalink
Use StartupFinished and do some refactoring for cyclomatic complexity
Browse files Browse the repository at this point in the history
  • Loading branch information
mdubbyap committed Dec 12, 2018
1 parent 6446ef8 commit a2f80da
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 77 deletions.
41 changes: 24 additions & 17 deletions dp/dpbuffered/bufferedforwarder.go
Expand Up @@ -71,10 +71,11 @@ type BufferedForwarder struct {
cdim *log.CtxDimensions
identifier string

sendTo signalfx.Sink
closeSender func() error
stopContext context.Context
stopFunc context.CancelFunc
sendTo signalfx.Sink
stopContext context.Context
stopFunc context.CancelFunc
closeSender func() error
afterStartup func() error
}

var _ dpsink.Sink = &BufferedForwarder{}
Expand Down Expand Up @@ -347,26 +348,32 @@ func logEvIfFlag(l log.Logger, checker flagChecker, ev []*event.Event, msg strin
}
}

// StartupFinished runs the afterStartup method on the forwarder
func (forwarder *BufferedForwarder) StartupFinished() error {
return forwarder.afterStartup()
}

// NewBufferedForwarder is used only by this package to create a forwarder that buffers its
// datapoint channel
func NewBufferedForwarder(ctx context.Context, config *Config, sendTo signalfx.Sink, closeIt func() error, logger log.Logger) *BufferedForwarder {
func NewBufferedForwarder(ctx context.Context, config *Config, sendTo signalfx.Sink, closeIt, afterStartup func() error, logger log.Logger) *BufferedForwarder {
config = pointer.FillDefaultFrom(config, DefaultConfig).(*Config)
logCtx := log.NewContext(logger).With(logkey.Struct, "BufferedForwarder")
logCtx.Log(logkey.Config, config)
context, cancel := context.WithCancel(ctx)
ret := &BufferedForwarder{
stopFunc: cancel,
stopContext: context,
dpChan: make(chan []*datapoint.Datapoint, *config.BufferSize),
eChan: make(chan []*event.Event, *config.BufferSize),
tChan: make(chan []*trace.Span, *config.BufferSize),
config: config,
sendTo: sendTo,
closeSender: closeIt,
logger: logCtx,
checker: config.Checker,
cdim: config.Cdim,
identifier: *config.Name,
stopFunc: cancel,
stopContext: context,
dpChan: make(chan []*datapoint.Datapoint, *config.BufferSize),
eChan: make(chan []*event.Event, *config.BufferSize),
tChan: make(chan []*trace.Span, *config.BufferSize),
config: config,
sendTo: sendTo,
closeSender: closeIt,
afterStartup: afterStartup,
logger: logCtx,
checker: config.Checker,
cdim: config.Cdim,
identifier: *config.Name,
}
ret.start()
return ret
Expand Down
11 changes: 6 additions & 5 deletions dp/dpbuffered/bufferedforwarder_test.go
Expand Up @@ -65,7 +65,8 @@ func TestBufferedForwarderBasic(t *testing.T) {
buf := &bytes.Buffer{}
threadWriter := &threadSafeWriter{Writer: buf}
l := log.NewLogfmtLogger(threadWriter, log.Panic)
bf := NewBufferedForwarder(ctx, config, sendTo, c, l)
bf := NewBufferedForwarder(ctx, config, sendTo, c, c, l)
So(bf.StartupFinished(), ShouldBeNil)
datas := []*datapoint.Datapoint{
dptest.DP(),
dptest.DP(),
Expand Down Expand Up @@ -196,7 +197,7 @@ func TestBufferedForwarderContexts(t *testing.T) {
}

sendTo := dptest.NewBasicSink()
bf := NewBufferedForwarder(ctx, config, sendTo, c, log.Discard)
bf := NewBufferedForwarder(ctx, config, sendTo, c, c, log.Discard)
assert.NoError(t, bf.AddDatapoints(ctx, datas))
canceledContext, cancelFunc := context.WithCancel(ctx)
waiter := make(chan struct{})
Expand Down Expand Up @@ -265,7 +266,7 @@ func TestBufferedForwarderContextsEvent(t *testing.T) {
spans := []*trace.Span{{}}

sendTo := dptest.NewBasicSink()
bf := NewBufferedForwarder(ctx, config, sendTo, c, log.Discard)
bf := NewBufferedForwarder(ctx, config, sendTo, c, c, log.Discard)
assert.NoError(t, bf.AddEvents(ctx, events))
assert.NoError(t, bf.AddSpans(ctx, spans))
canceledContext, cancelFunc := context.WithCancel(ctx)
Expand Down Expand Up @@ -313,7 +314,7 @@ func TestBufferedForwarderMaxTotalDatapoints(t *testing.T) {
}
ctx := context.Background()
sendTo := dptest.NewBasicSink()
bf := NewBufferedForwarder(ctx, config, sendTo, c, log.Discard)
bf := NewBufferedForwarder(ctx, config, sendTo, c, c, log.Discard)
defer func() {
assert.NoError(t, bf.Close())
}()
Expand Down Expand Up @@ -348,7 +349,7 @@ func TestBufferedForwarderMaxTotalEvents(t *testing.T) {
}
ctx := context.Background()
sendTo := dptest.NewBasicSink()
bf := NewBufferedForwarder(ctx, config, sendTo, c, log.Discard)
bf := NewBufferedForwarder(ctx, config, sendTo, c, c, log.Discard)
defer func() {
assert.NoError(t, bf.Close())
}()
Expand Down
134 changes: 79 additions & 55 deletions main.go
Expand Up @@ -294,7 +294,7 @@ func forwarderName(f *config.ForwardTo) string {

var errDupeForwarder = errors.New("cannot duplicate forwarder names or types without names")

func setupForwarders(ctx context.Context, hostname string, tk timekeeper.TimeKeeper, loader *config.Loader, loadedConfig *config.ProxyConfig, logger log.Logger, scheduler *sfxclient.Scheduler, Checker *dpsink.ItemFlagger, cdim *log.CtxDimensions, manager *etcdManager) ([]protocol.Forwarder, error) {
func setupForwarders(ctx context.Context, tk timekeeper.TimeKeeper, loader *config.Loader, loadedConfig *config.ProxyConfig, logger log.Logger, scheduler *sfxclient.Scheduler, Checker *dpsink.ItemFlagger, cdim *log.CtxDimensions, manager *etcdManager) ([]protocol.Forwarder, error) {
allForwarders := make([]protocol.Forwarder, 0, len(loadedConfig.ForwardTo))
nameMap := make(map[string]bool)
for idx, forwardConfig := range loadedConfig.ForwardTo {
Expand Down Expand Up @@ -335,7 +335,7 @@ func setupForwarders(ctx context.Context, hostname string, tk timekeeper.TimeKee
Name: forwardConfig.Name,
Cdim: cdim,
}
bf := dpbuffered.NewBufferedForwarder(ctx, bconf, endingSink, forwarder.Close, limitedLogger)
bf := dpbuffered.NewBufferedForwarder(ctx, bconf, endingSink, forwarder.Close, forwarder.StartupFinished, limitedLogger)
allForwarders = append(allForwarders, bf)

groupName := fmt.Sprintf("%s_f_%d", name, idx)
Expand All @@ -347,7 +347,7 @@ func setupForwarders(ctx context.Context, hostname string, tk timekeeper.TimeKee
"name": name,
"direction": "forwarder",
"source": "proxy",
"host": hostname,
"host": *loadedConfig.ServerName,
"type": forwardConfig.Type,
})
}
Expand Down Expand Up @@ -598,6 +598,47 @@ func (p *proxy) scheduleStatCollection(ctx context.Context, scheduler *sfxclient
return finishedContext, cancelFunc
}

func (p *proxy) setupForwardersAndListeners(ctx context.Context, loader *config.Loader, loadedConfig *config.ProxyConfig, logger log.Logger, scheduler *sfxclient.Scheduler) (signalfx.Sink, error) {
var err error
p.forwarders, err = setupForwarders(ctx, p.tk, loader, loadedConfig, logger, scheduler, &p.debugSink, &p.ctxDims, p.etcdMgr)
if err != nil {
p.logger.Log(log.Err, err, "Unable to setup forwarders")
return nil, errors.Annotate(err, "unable to setup forwarders")
}

dpSinks, eSinks, tSinks := splitSinks(p.forwarders)

dmux := &demultiplexer.Demultiplexer{
DatapointSinks: dpSinks,
EventSinks: eSinks,
TraceSinks: tSinks,
Logger: log.NewOnePerSecond(logger),
LateDuration: loadedConfig.LateThresholdDuration,
FutureDuration: loadedConfig.FutureThresholdDuration,
}
scheduler.AddCallback(dmux)

p.versionMetric.RepoURL = "https://github.com/signalfx/metricproxy"
p.versionMetric.FileName = "/buildInfo.json"
scheduler.AddCallback(&p.versionMetric)

multiplexer := signalfx.FromChain(dmux, signalfx.NextWrap(signalfx.UnifyNextSinkWrap(&p.debugSink)))

p.listeners, err = setupListeners(p.tk, *loadedConfig.ServerName, loader, loadedConfig.ListenFrom, multiplexer, logger, scheduler)
if err != nil {
p.logger.Log(log.Err, err, "Unable to setup listeners")
return nil, errors.Annotate(err, "cannot setup listeners from configuration")
}

var errs []error
for _, f := range p.forwarders {
err = f.StartupFinished()
errs = append(errs, err)
log.IfErr(logger, err)
}
return multiplexer, FirstNonNil(errs...)
}

func (p *proxy) run(ctx context.Context) error {
p.debugSink.CtxFlagCheck = &p.debugContext
p.logger.Log(logkey.ConfigFile, p.flags.configFileName, "Looking for config file")
Expand All @@ -616,10 +657,12 @@ func (p *proxy) run(ctx context.Context) error {
scheduler := p.setupScheduler(*loadedConfig.ServerName)

if err := p.setupDebugServer(loadedConfig, logger, scheduler); err != nil {
p.logger.Log(log.Err, "debug server failed", err)
return err
}

p.etcdMgr.setup(loadedConfig)

if err := p.etcdMgr.start(); err != nil {
p.logger.Log(log.Err, "unable to start etcd server", err)
return err
Expand All @@ -635,60 +678,31 @@ func (p *proxy) run(ctx context.Context) error {
chain := p.createCommonHTTPChain(loadedConfig)
loader := config.NewLoader(ctx, logger, Version, &p.debugContext, &p.debugSink, &p.ctxDims, chain)

forwarders, err := setupForwarders(ctx, *loadedConfig.ServerName, p.tk, loader, loadedConfig, logger, scheduler, &p.debugSink, &p.ctxDims, p.etcdMgr)
if err != nil {
p.logger.Log(log.Err, err, "Unable to setup forwarders")
return errors.Annotate(err, "unable to setup forwarders")
}

p.forwarders = forwarders
dpSinks, eSinks, tSinks := splitSinks(forwarders)

dmux := &demultiplexer.Demultiplexer{
DatapointSinks: dpSinks,
EventSinks: eSinks,
TraceSinks: tSinks,
Logger: log.NewOnePerSecond(logger),
LateDuration: loadedConfig.LateThresholdDuration,
FutureDuration: loadedConfig.FutureThresholdDuration,
}
scheduler.AddCallback(dmux)

p.versionMetric.RepoURL = "https://github.com/signalfx/metricproxy"
p.versionMetric.FileName = "/buildInfo.json"
scheduler.AddCallback(&p.versionMetric)

multiplexer := signalfx.FromChain(dmux, signalfx.NextWrap(signalfx.UnifyNextSinkWrap(&p.debugSink)))

listeners, err := setupListeners(p.tk, *loadedConfig.ServerName, loader, loadedConfig.ListenFrom, multiplexer, logger, scheduler)
if err != nil {
p.logger.Log(log.Err, err, "Unable to setup listeners")
return errors.Annotate(err, "cannot setup listeners from configuration")
}
p.listeners = listeners

finishedContext, cancelFunc := p.scheduleStatCollection(ctx, scheduler, loadedConfig, multiplexer)
defer cancelFunc()
multiplexer, err := p.setupForwardersAndListeners(ctx, loader, loadedConfig, logger, scheduler)
if err == nil {
finishedContext, cancelFunc := p.scheduleStatCollection(ctx, scheduler, loadedConfig, multiplexer)

// Schedule datapoint collection to a Discard sink so we can get the stats in Expvar()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
err := scheduler.Schedule(finishedContext)
logger.Log(log.Err, err, logkey.Struct, "scheduler", "Schedule finished")
wg.Done()
}()
if p.setupDoneSignal != nil {
close(p.setupDoneSignal)
}

// Schedule datapoint collection to a Discard sink so we can get the stats in Expvar()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
err := scheduler.Schedule(finishedContext)
logger.Log(log.Err, err, logkey.Struct, "scheduler", "Schedule finished")
wg.Done()
}()
logger.Log("Setup done. Blocking!")
if p.setupDoneSignal != nil {
close(p.setupDoneSignal)
}
select {
case <-ctx.Done():
case <-p.signalChan:
err = p.gracefulShutdown()
logger.Log("Setup done. Blocking!")
select {
case <-ctx.Done():
case <-p.signalChan:
err = p.gracefulShutdown()
}
cancelFunc()
wg.Wait()
}
cancelFunc()
wg.Wait()
return err
}

Expand All @@ -699,3 +713,13 @@ func main() {
signal.Notify(mainInstance.signalChan, syscall.SIGTERM)
log.IfErr(log.DefaultLogger, mainInstance.main(context.Background()))
}

// FirstNonNil returns what it says it does
func FirstNonNil(errs ...error) error {
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}
7 changes: 7 additions & 0 deletions main_test.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/signalfx/metricproxy/protocol/carbon"
"github.com/signalfx/metricproxy/protocol/signalfx"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
)

const configEtcd = `
Expand Down Expand Up @@ -515,6 +516,12 @@ func formatTargetAddresses(targetClusters []string) (targetAddresses string) {
return
}

var errTest = errors.New("test")

func Test_NonNil(t *testing.T) {
assert.Equal(t, FirstNonNil(errTest), errTest)
}

func TestProxyCluster(t *testing.T) {
Convey("a setup proxy cluster", t, func() {
ctx, cancelfunc := context.WithCancel(context.Background())
Expand Down
5 changes: 5 additions & 0 deletions protocol/csv/csvforwarder.go
Expand Up @@ -47,6 +47,11 @@ type Forwarder struct {

var _ dpsink.Sink = &Forwarder{}

// StartupFinished can be called if you want to do something after startup is complete
func (f *Forwarder) StartupFinished() error {
return nil
}

// Datapoints returns nothing and exists to satisfy the protocol.Forwarder interface
func (f *Forwarder) Datapoints() []*datapoint.Datapoint {
return f.GetFilteredDatapoints()
Expand Down
1 change: 1 addition & 0 deletions protocol/csv/csvforwarder_test.go
Expand Up @@ -38,6 +38,7 @@ func TestFilenameForwarder(t *testing.T) {
assert.NoError(t, f.AddEvents(ctx, []*event.Event{dptest.E()}))
assert.NoError(t, f.AddSpans(ctx, []*trace.Span{{}}))
assert.Equal(t, int64(0), f.Pipeline())
assert.Nil(t, f.StartupFinished())
}

func TestFilenameForwarderBadFilename(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions protocol/protocol.go
Expand Up @@ -24,6 +24,7 @@ type Forwarder interface {
Pipeline
sfxclient.Collector
io.Closer
StartupHook
}

// Listener is the basic interface anything that listens for new metrics must implement
Expand All @@ -38,6 +39,11 @@ type HealthChecker interface {
CloseHealthCheck()
}

// StartupHook interface allows a forwarder to present a callback after startup if it needs to do something that requires a fully running metricproxy
type StartupHook interface {
StartupFinished() error
}

// Pipeline returns the number of items still in flight that need to be drained
type Pipeline interface {
Pipeline() int64
Expand All @@ -48,6 +54,11 @@ type UneventfulForwarder struct {
DatapointForwarder
}

// StartupFinished is to be called after startup is finished
func (u *UneventfulForwarder) StartupFinished() error {
return nil
}

// AddEvents does nothing and returns nil
func (u *UneventfulForwarder) AddEvents(ctx context.Context, events []*event.Event) error {
return nil
Expand Down
1 change: 1 addition & 0 deletions protocol/protocol_test.go
Expand Up @@ -11,6 +11,7 @@ func TestUneventfulForwarder(t *testing.T) {
assert.Equal(t, u.AddEvents(nil, nil), nil)
assert.Equal(t, u.AddSpans(nil, nil), nil)
assert.Equal(t, int64(0), u.Pipeline())
assert.Equal(t, u.StartupFinished(), nil)
}

func TestDimMakers(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions protocol/signalfx/signalfxforwarder.go
Expand Up @@ -192,3 +192,8 @@ func (connector *Forwarder) AddSpans(ctx context.Context, spans []*trace.Span) e
func (connector *Forwarder) Pipeline() int64 {
return atomic.LoadInt64(&connector.stats.pipeline)
}

// StartupFinished calls the same interface on the sampler as a hook called by run() after the proxy is up and running
func (connector *Forwarder) StartupFinished() error {
return connector.sampler.StartupFinished()
}
1 change: 1 addition & 0 deletions protocol/signalfx/signalfxlistener_test.go
Expand Up @@ -251,6 +251,7 @@ func TestSignalfxListener(t *testing.T) {
TraceURL: pointer.String(fmt.Sprintf("%s/v1/trace", baseURI)),
}
forwarder, err := NewForwarder(forwardConfig)
So(forwarder.StartupFinished(), ShouldBeNil)
So(err, ShouldBeNil)
So(len(forwarder.Datapoints()), ShouldEqual, 7)
So(forwarder.Pipeline(), ShouldEqual, 0)
Expand Down

0 comments on commit a2f80da

Please sign in to comment.