Skip to content

Commit

Permalink
sink: wait for infra-agent to be ready before starting (#346)
Browse files Browse the repository at this point in the history
Co-authored-by: Guillermo Sanchez Gavier <gsanchez@newrelic.com>
Co-authored-by: Paolo Gallina <paologallina1992@gmail.com>
  • Loading branch information
3 people committed Feb 2, 2022
1 parent a0ed2ac commit 1d4c557
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 50 deletions.
50 changes: 16 additions & 34 deletions cmd/nri-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,28 @@ package main

import (
"fmt"
"net"
"os"
"path"
"runtime"
"strconv"
"strings"
"time"

"github.com/newrelic/infra-integrations-sdk/integration"
"github.com/sethgrid/pester"
sdk "github.com/newrelic/infra-integrations-sdk/integration"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"

"github.com/newrelic/nri-kubernetes/v3/internal/config"
"github.com/newrelic/nri-kubernetes/v3/internal/storer"
"github.com/newrelic/nri-kubernetes/v3/src/client"
"github.com/newrelic/nri-kubernetes/v3/src/controlplane"
"github.com/newrelic/nri-kubernetes/v3/src/integration"
"github.com/newrelic/nri-kubernetes/v3/src/ksm"
ksmClient "github.com/newrelic/nri-kubernetes/v3/src/ksm/client"
"github.com/newrelic/nri-kubernetes/v3/src/kubelet"
kubeletClient "github.com/newrelic/nri-kubernetes/v3/src/kubelet/client"
"github.com/newrelic/nri-kubernetes/v3/src/prometheus"
"github.com/newrelic/nri-kubernetes/v3/src/sink"
)

const (
Expand Down Expand Up @@ -77,7 +73,19 @@ func main() {
}
}

i, err := createIntegrationWithHTTPSink(c)
iw, err := integration.NewWrapper(
integration.WithLogger(logger),
integration.WithMetadata(integration.Metadata{
Name: integrationName,
Version: integrationVersion,
}),
)
if err != nil {
logger.Errorf("creating integration wrapper: %v", err)
os.Exit(exitIntegration)
}

i, err := iw.Integration(c.Sink.HTTP)
if err != nil {
logger.Errorf("creating integration with http sink: %v", err)
os.Exit(exitIntegration)
Expand Down Expand Up @@ -154,7 +162,7 @@ func main() {
}
}

func runScrapers(c *config.Config, ksmScraper *ksm.Scraper, kubeletScraper *kubelet.Scraper, controlplaneScraper *controlplane.Scraper, i *integration.Integration) error {
func runScrapers(c *config.Config, ksmScraper *ksm.Scraper, kubeletScraper *kubelet.Scraper, controlplaneScraper *controlplane.Scraper, i *sdk.Integration) error {
if c.KSM.Enabled {
err := ksmScraper.Run(i)
if err != nil {
Expand Down Expand Up @@ -273,32 +281,6 @@ func buildClients(c *config.Config) (*clusterClients, error) {
}, nil
}

func createIntegrationWithHTTPSink(config *config.Config) (*integration.Integration, error) {
c := pester.New()
c.Backoff = pester.LinearBackoff
c.MaxRetries = config.Sink.HTTP.Retries
c.Timeout = config.Sink.HTTP.Timeout
c.LogHook = func(e pester.ErrEntry) {
logger.Debugf("sending data to httpSink: %q", e)
}

endpoint := net.JoinHostPort(sink.DefaultAgentForwarderhost, strconv.Itoa(config.Sink.HTTP.Port))

sinkOptions := sink.HTTPSinkOptions{
URL: fmt.Sprintf("http://%s%s", endpoint, sink.DefaultAgentForwarderPath),
Client: c,
}

h, err := sink.NewHTTPSink(sinkOptions)
if err != nil {
return nil, fmt.Errorf("creating HTTPSink: %w", err)
}

cache := storer.NewInMemoryStore(storer.DefaultTTL, storer.DefaultInterval, logger)

return integration.New(integrationName, integrationVersion, integration.Writer(h), integration.Storer(cache))
}

func getK8sConfig(c *config.Config) (*rest.Config, error) {
inclusterConfig, err := rest.InClusterConfig()
if err == nil {
Expand Down
11 changes: 6 additions & 5 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
)

const (
DefaultFileName = "nri-kubernetes"
DefaultFilePath = "/etc/newrelic-infra"
DefaultTimeout = 10000 * time.Millisecond
DefaultRetries = 4
DefaultFileName = "nri-kubernetes"
DefaultFilePath = "/etc/newrelic-infra"
DefaultTimeout = 10 * time.Second
DefaultRetries = 3
DefaultAgentTimeout = 3 * time.Second
)

type Config struct {
Expand Down Expand Up @@ -115,7 +116,7 @@ func LoadConfig(filePath string, fileName string) (*Config, error) {

// Sane connection defaults
v.SetDefault("sink.http.port", 0)
v.SetDefault("sink.http.timeout", DefaultTimeout)
v.SetDefault("sink.http.timeout", DefaultAgentTimeout)
v.SetDefault("sink.http.retries", DefaultRetries)

v.SetDefault("kubelet.timeout", DefaultTimeout)
Expand Down
68 changes: 68 additions & 0 deletions src/integration/prober/prober.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package prober

import (
"errors"
"fmt"
"net/http"
"time"

"github.com/newrelic/nri-kubernetes/v3/internal/logutil"
log "github.com/sirupsen/logrus"
)

// Prober is an object that polls and http URL and returns an error if it does not return 200 Ok within the specified
// timeout.
type Prober struct {
timeout time.Duration
backoff time.Duration
Logger *log.Logger
}

var ErrProbeTimeout = errors.New("probe timed out")
var errProbeNotOk = errors.New("probe did not return 200 Ok")

// New creates a Prober that will check an endpoint every backoff seconds.
func New(timeout, backoff time.Duration) *Prober {
return &Prober{
timeout: timeout,
backoff: backoff,
Logger: logutil.Discard,
}
}

// Probe repeatedly hits the specified url with a GET request every Prober.backoff, and blocks until a request returns
// 200, or Prober.timeout passes.
func (p *Prober) Probe(url string) error {
start := time.Now()
for {
if time.Since(start) > p.timeout {
return fmt.Errorf("%w after %s", ErrProbeTimeout, p.timeout)
}

err := p.attempt(url)
if err != nil {
p.Logger.Debug(err)
p.Logger.Debugf("Retrying in %s", p.backoff)
time.Sleep(p.backoff)
continue
}

return nil
}
}

// attempt makes a request to the specified URL and returns an error if it does not return 200.
func (p *Prober) attempt(url string) error {
resp, err := http.Get(url)
if err != nil {
return fmt.Errorf("probe attempt to %s failed: %w", url, err)
}

_ = resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%w: %d", errProbeNotOk, resp.StatusCode)
}

return nil
}
47 changes: 47 additions & 0 deletions src/integration/prober/prober_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package prober_test

import (
"errors"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/newrelic/nri-kubernetes/v3/src/integration/prober"
)

func succeedAfter(duration time.Duration) http.HandlerFunc {
creation := time.Now()
return func(rw http.ResponseWriter, request *http.Request) {
if time.Since(creation) > duration {
rw.WriteHeader(http.StatusOK)
return
}

rw.WriteHeader(http.StatusInternalServerError)
}
}

func TestProber_fails_as_expected(t *testing.T) {
t.Parallel()

p := prober.New(4*time.Second, 300*time.Millisecond)
server := httptest.NewServer(succeedAfter(5 * time.Second))

err := p.Probe(server.URL)
if !errors.Is(err, prober.ErrProbeTimeout) {
t.Fatalf("Expected timeout error, got %v", err)
}
}

func TestProber_succeeds(t *testing.T) {
t.Parallel()

p := prober.New(15*time.Second, 300*time.Millisecond)
server := httptest.NewServer(succeedAfter(5 * time.Second))

err := p.Probe(server.URL)
if errors.Is(err, prober.ErrProbeTimeout) {
t.Fatalf("Expected timeout error, got %v", err)
}
}
14 changes: 7 additions & 7 deletions src/sink/http.go → src/integration/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ const (
DefaultAgentForwarderPath = "/v1/data"
)

// Doer is the interface that httpSink client should satisfy.
// Doer is the interface that HTTPSink client should satisfy.
type Doer interface {
Do(req *http.Request) (*http.Response, error)
}

// httpSink holds the configuration of the HTTP sink used by the integration.
type httpSink struct {
// HTTPSink holds the configuration of the HTTP sink used by the integration.
type HTTPSink struct {
url string
client Doer
}
Expand All @@ -32,8 +32,8 @@ type HTTPSinkOptions struct {
Client Doer
}

//NewHTTPSink initialize httpSink struct.
func NewHTTPSink(options HTTPSinkOptions) (io.Writer, error) {
//New initialize HTTPSink struct.
func New(options HTTPSinkOptions) (*HTTPSink, error) {
if options.Client == nil {
return nil, fmt.Errorf("client cannot be nil")
}
Expand All @@ -42,14 +42,14 @@ func NewHTTPSink(options HTTPSinkOptions) (io.Writer, error) {
return nil, fmt.Errorf("url cannot be empty")
}

return &httpSink{
return &HTTPSink{
url: options.URL,
client: options.Client,
}, nil
}

// Write is the function signature needed by the infrastructure SDK package.
func (h httpSink) Write(p []byte) (n int, err error) {
func (h HTTPSink) Write(p []byte) (n int, err error) {
request, err := http.NewRequest("POST", h.url, bytes.NewBuffer(p))
if err != nil {
return 0, fmt.Errorf("preparing request: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions src/sink/http_test.go → src/integration/sink/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/newrelic/nri-kubernetes/v3/src/sink"
"github.com/newrelic/nri-kubernetes/v3/src/integration/sink"
)

const (
Expand Down Expand Up @@ -41,7 +41,7 @@ func Test_http_Sink_creation_fails_when_there_is(t *testing.T) {
options := getHTTPSinkOptions(t)
modifyFunc(&options)

_, err := sink.NewHTTPSink(options)
_, err := sink.New(options)
assert.Error(t, err, "error expected since client is nil")
})
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func Test_http_sink_writes_data_successfully_when_within_ctxDeadline(t *testing.
options := getHTTPSinkOptions(t)
options.URL = testURL

h, err := sink.NewHTTPSink(options)
h, err := sink.New(options)
require.NoError(t, err, "no error expected")

_, err = h.Write([]byte("random data"))
Expand Down Expand Up @@ -121,7 +121,7 @@ func Test_http_sink_fails_writing_data_when(t *testing.T) {
c := defaultPesterClient(t)
c.Timeout = tc.requestTimeout

h, err := sink.NewHTTPSink(sink.HTTPSinkOptions{
h, err := sink.New(sink.HTTPSinkOptions{
URL: testURL,
Client: c,
})
Expand Down
Loading

0 comments on commit 1d4c557

Please sign in to comment.