Skip to content
This repository has been archived by the owner on Mar 1, 2022. It is now read-only.

Commit

Permalink
Preparing for new adapter version release (#93)
Browse files Browse the repository at this point in the history
* INT-524 update istio adapter to use the latest go-metrics package

* incorporated code review comments

* incorporate code review comments, added max buffer size and batch size attributes for direct injestion in config.proto

* incorporate code review comments, added FlushInterval and MAxBufferSize parameters for Direct config since the values were not pushed to WF cluster without this

* Fixed buffer full issue and updated adapter version

* Removed unused dependency

* Implemented review comments.

* Minor change

* Modified to have helm template variables

* Reverting previous changes and making config files same as master

* Implemented review comments

* Minor change

Co-authored-by: Baburaj Velayudhan <bvelayudhan@vmware.com>
  • Loading branch information
srinivas-kandula and baburajvelayudhan committed Apr 29, 2020
1 parent 29b48dd commit 4177baf
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 33 deletions.
7 changes: 3 additions & 4 deletions go.mod
Expand Up @@ -10,17 +10,16 @@ require (
github.com/mackerelio/go-osstat v0.0.0-20181013165219-b878220919ec
github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962
github.com/spf13/cobra v0.0.0-20180821161202-6fd8e29b07d8 // indirect
github.com/spf13/pflag v1.0.1 // indirect
github.com/stretchr/testify v1.3.0 // indirect
github.com/wavefrontHQ/go-metrics-wavefront v0.9.0
github.com/wavefronthq/go-metrics-wavefront v1.0.2
github.com/wavefronthq/wavefront-sdk-go v0.9.5
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1 // indirect
google.golang.org/grpc v1.16.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
istio.io/api v0.0.0-20190416154520-4a9a2a12a700 // v1.1.15
istio.io/istio v0.0.0-20190913154610-b12614cbfb7a // v1.1.15
)
Expand Down
40 changes: 33 additions & 7 deletions go.sum
@@ -1,13 +1,19 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/caio/go-tdigest v2.3.0+incompatible h1:zP6nR0nTSUzlSqqr7F/LhslPlSZX/fZeGmgmwj2cxxY=
github.com/caio/go-tdigest v2.3.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a h1:dR8+Q0uO5S2ZBcs2IH6VBKYwSxPo2vYCYq0ot0mu7xA=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
Expand All @@ -20,7 +26,10 @@ github.com/hashicorp/go-multierror v0.0.0-20180717150148-3d5d8f294aa0 h1:j30noez
github.com/hashicorp/go-multierror v0.0.0-20180717150148-3d5d8f294aa0/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 h1:X/79QL0b4YJVO5+OsPH9rF2u428CIrGL/jLmPsoOQQ4=
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353/go.mod h1:N0SVk0uhy+E1PZ3C9ctsPRlvOPAFPkCNlcPBDkt0N3U=
github.com/mackerelio/go-osstat v0.0.0-20181013165219-b878220919ec h1:zmKkStGDOkrlXpfWgexVI9aElDuRd9P/+JFg6XuJwlQ=
github.com/mackerelio/go-osstat v0.0.0-20181013165219-b878220919ec/go.mod h1:sRByAXz76nwXkhnEDpyxB17EbiP+qYBT+oA/CdhC5fQ=
github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
Expand All @@ -29,23 +38,30 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165 h1:nkcn14uNmFEuGCb2mBZbBb24RdNRL08b/wb+xBOYpuk=
github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 h1:eUm8ma4+yPknhXtkYlWh3tMkE6gBjXZToDned9s2gbQ=
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/spf13/cobra v0.0.0-20180821161202-6fd8e29b07d8 h1:3WLp0QKKpOEFRU2CnzYeh0Va/u17OBe4mms9OaHkjZM=
github.com/spf13/cobra v0.0.0-20180821161202-6fd8e29b07d8/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v1.0.1 h1:aCvUg6QPl3ibpQUxyLkrEkCHtPqYJL4x9AuhqVqFis4=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/wavefrontHQ/go-metrics-wavefront v0.9.0 h1:DBj656apE4IH8S9Hvp3DGXZBBMaVoXenDMkikXkKLJI=
github.com/wavefrontHQ/go-metrics-wavefront v0.9.0/go.mod h1:5nUOqBHWQSKs/5884Au00NYW+ZZ9NDA0qOYEKq2wjqI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/wavefronthq/go-metrics-wavefront v1.0.2 h1:6glPNj6QzTVQD0KA9+WKpAAkkkydlXe0YAcUXkyEdLQ=
github.com/wavefronthq/go-metrics-wavefront v1.0.2/go.mod h1:HuUs+CDX00uWJbnbk2vbnG+tko6hN1pdnC6qaHK/9Fo=
github.com/wavefronthq/wavefront-sdk-go v0.9.5 h1:PCMP+lL6u31ZBMqNvbtwV0cQywJIs+g1ijFj8/ohIhU=
github.com/wavefronthq/wavefront-sdk-go v0.9.5/go.mod h1:w1jMUOL5ARz+qQTdqwkeNfY9Cp0bB+90jThq169rKFA=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2 h1:y102fOLFqhV41b+4GPiJoa0k/x+pJcEi2/HB1Y5T6fU=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand All @@ -56,7 +72,16 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522 h1:Ve1ORMCxvRmSXBwJK+t3Oy+V2
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.6.0/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU=
gonum.org/v1/gonum v0.6.2 h1:4r+yNT0+8SWcOkXP+63H2zQbN+USnC73cjGUxnDF94Q=
gonum.org/v1/gonum v0.6.2/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPjXu+YNeGvK9VTSHY6+Qihc=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
Expand All @@ -73,3 +98,4 @@ istio.io/api v0.0.0-20190416154520-4a9a2a12a700 h1:L7XTFapXB3oQyW2+5UwMJ6alba/GK
istio.io/api v0.0.0-20190416154520-4a9a2a12a700/go.mod h1:hhLFQmpHia8zgaM37vb2ml9iS5NfNfqZGRt1pS9aVEo=
istio.io/istio v0.0.0-20190913154610-b12614cbfb7a h1:7wxVxWFgQZgPwW3LfHbH0kDxEXd1/Me5M5Q+IqXGgD0=
istio.io/istio v0.0.0-20190913154610-b12614cbfb7a/go.mod h1:OWBySrQjjk549IhxWCt7DTl9ZSsXdvbgm+SmgGVRsGA=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
2 changes: 1 addition & 1 deletion wavefront/system_stats.go
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/mackerelio/go-osstat/cpu"
metrics "github.com/rcrowley/go-metrics"
wf "github.com/wavefrontHQ/go-metrics-wavefront"
wf "github.com/wavefronthq/go-metrics-wavefront/reporting"
"istio.io/istio/pkg/log"
)

Expand Down
116 changes: 95 additions & 21 deletions wavefront/wavefront.go
Expand Up @@ -25,10 +25,14 @@ import (
"context"
"fmt"
"net"
"strconv"
"strings"

metrics "github.com/rcrowley/go-metrics"
"github.com/vmware/wavefront-adapter-for-istio/wavefront/config"
wf "github.com/wavefrontHQ/go-metrics-wavefront"
wf "github.com/wavefronthq/go-metrics-wavefront/reporting"
"github.com/wavefronthq/wavefront-sdk-go/application"
"github.com/wavefronthq/wavefront-sdk-go/senders"

"google.golang.org/grpc"

Expand All @@ -48,25 +52,38 @@ type (

// WavefrontAdapter supports metric template.
WavefrontAdapter struct {
listener net.Listener
server *grpc.Server
reporterInitialized bool
listener net.Listener
server *grpc.Server
reporter wf.WavefrontMetricsReporter
}
)

// ensure that WavefrontAdapter implements the HandleMetricServiceServer interface.
var _ metric.HandleMetricServiceServer = &WavefrontAdapter{}

// createWavefrontReporter creates a reporter that periodically flushes metrics to Wavefront.
func createWavefrontReporter(cfg *config.Params) {
hostTags := map[string]string{"source": cfg.Source}
func (wa *WavefrontAdapter) createWavefrontReporter(cfg *config.Params) {
var sender senders.Sender
flushInterval := int(cfg.FlushInterval.Seconds())
if direct := cfg.GetDirect(); direct != nil {
go wf.WavefrontDirect(metrics.DefaultRegistry, cfg.FlushInterval, hostTags, cfg.Prefix, direct.Server, direct.Token)
sender = createDirectSender(direct, flushInterval)
} else if proxy := cfg.GetProxy(); proxy != nil {
addr, _ := net.ResolveTCPAddr("tcp", proxy.Address)
go wf.WavefrontProxy(metrics.DefaultRegistry, cfg.FlushInterval, hostTags, cfg.Prefix, addr)
sender = createProxySender(proxy, flushInterval)
}

if sender != nil {
wa.reporter = wf.NewReporter(
sender,
application.New("wavefront-istio-adapter", "wavefront-istio-adapter"),
wf.Source(cfg.Source),
wf.Prefix(cfg.Prefix),
wf.LogErrors(true),
)
} else {
log.Fatalf("Wavefront sender is not initialized.")
}

hostTags := map[string]string{"source": cfg.Source}
createSystemStatsReporter(hostTags)
}

Expand Down Expand Up @@ -101,21 +118,74 @@ func (wa *WavefrontAdapter) setLogLevel(cfg *config.Params) {
// verifyAndInitReporter checks if the Wavefront reporter is initialized, and if
// not, initializes it.
func (wa *WavefrontAdapter) verifyAndInitReporter(cfg *config.Params) {
if !wa.reporterInitialized {
if wa.reporter == nil {
log.Infof("trying to init wavefront reporter, config: %s", cfg.String())

wa.setLogLevel(cfg)

if err := config.ValidateCredentials(cfg); err != nil {
log.Errorf("failed to create wavefront reporter, err: %s, config: %s", err.Error(), cfg.String())
} else {
createWavefrontReporter(cfg)
wa.reporterInitialized = true
wa.createWavefrontReporter(cfg)
log.Infof("wavefront reporter successfully initialized, config: %s", cfg.String())
}
}
}

// creates wavefront direct sender
func createDirectSender(direct *config.Params_WavefrontDirect, flushInterval int) senders.Sender {
directCfg := &senders.DirectConfiguration{
Server: direct.Server,
Token: direct.Token,
FlushIntervalSeconds: flushInterval,
BatchSize: 10000,
MaxBufferSize: 50000,
}
sender, err := senders.NewDirectSender(directCfg)
if err != nil {
log.Fatalf("Error creating direct sender: %v", err)
return nil
}
return sender
}

// creates wavefront proxy sender
func createProxySender(proxy *config.Params_WavefrontProxy, flushInterval int) senders.Sender {
addr, err := net.ResolveTCPAddr("tcp", proxy.Address)
if err != nil {
log.Fatalf("Cannot resolve proxy address %v", err)
return nil
}

// extract proxy ip and port from address
proxyInfo := strings.Split(addr.String(), ":")

// address must be in the form <proxyhost:port>
if len(proxyInfo) != 2 {
log.Fatalf("Proxy address and/or port number is missing.")
return nil
}

// numeric port number expected
portNum, err := strconv.Atoi(proxyInfo[1])
if err != nil {
log.Fatalf("Invalid port number %v", err)
return nil
}

proxyCfg := &senders.ProxyConfiguration{
Host: proxyInfo[0],
MetricsPort: portNum,
FlushIntervalSeconds: flushInterval,
}

sender, err := senders.NewProxySender(proxyCfg)
if err != nil {
log.Fatalf("Error creating proxy sender: %v", err)
return nil
}
return sender
}

// createMetricMap creates a map of metric names and the corresponding MetricInfo objects.
func createMetricMap(ms []*config.Params_MetricInfo) map[string]*config.Params_MetricInfo {
metricMap := make(map[string]*config.Params_MetricInfo)
Expand Down Expand Up @@ -163,7 +233,7 @@ func translateSample(s *config.Params_MetricInfo_Sample) metrics.Sample {

// writeMetrics extracts metric information from metric.InstanceMsgs and writes
// it to the Wavefront metric registry.
func writeMetrics(cfg *config.Params, insts []*metric.InstanceMsg) {
func (wa *WavefrontAdapter) writeMetrics(cfg *config.Params, insts []*metric.InstanceMsg) {
metricMap := createMetricMap(cfg.Metrics)
for _, inst := range insts {
metric, metricFound := metricMap[inst.Name]
Expand All @@ -181,7 +251,7 @@ func writeMetrics(cfg *config.Params, insts []*metric.InstanceMsg) {
if float64Val, err := translateToFloat64(value); err != nil {
log.Warnf("couldn't translate metric value: %s %v, err: %v", metricName, value, err)
} else {
gauge := wf.GetOrRegisterMetric(metricName, metrics.NewGaugeFloat64(), tags).(metrics.GaugeFloat64)
gauge := wa.reporter.GetOrRegisterMetric(metricName, metrics.NewGaugeFloat64(), tags).(metrics.GaugeFloat64)
gauge.Update(float64Val)
log.Debugf("updated gauge metric %s with %v, tags: %v", metricName, float64Val, tags)
}
Expand All @@ -191,7 +261,7 @@ func writeMetrics(cfg *config.Params, insts []*metric.InstanceMsg) {
log.Warnf("couldn't translate metric value: %s %v, err: %v", metricName, value, err)
} else {
deltaMetricName := wf.DeltaCounterName(metricName)
counter := wf.GetOrRegisterMetric(deltaMetricName, metrics.NewCounter(), tags).(metrics.Counter)
counter := wa.reporter.GetOrRegisterMetric(deltaMetricName, metrics.NewCounter(), tags).(metrics.Counter)
counter.Inc(int64Val)
log.Debugf("updated delta counter metric %s with %v, tags: %v", deltaMetricName, int64Val, tags)
}
Expand All @@ -204,7 +274,7 @@ func writeMetrics(cfg *config.Params, insts []*metric.InstanceMsg) {
if histogram == nil {
sample := translateSample(metric.Sample)
histogram = metrics.NewHistogram(sample)
wf.RegisterMetric(metricName, histogram, tags)
wa.reporter.RegisterMetric(metricName, histogram, tags)
}
histogram.(metrics.Histogram).Update(int64Val)
log.Debugf("updated histogram metric %s with %v, tags: %v", metricName, int64Val, tags)
Expand Down Expand Up @@ -239,7 +309,7 @@ func (wa *WavefrontAdapter) HandleMetric(ctx context.Context, r *metric.HandleMe
}

// write metrics
writeMetrics(cfg, r.Instances)
wa.writeMetrics(cfg, r.Instances)

log.Infof("metrics were processed successfully!")
return &v1beta1.ReportResult{}, nil
Expand Down Expand Up @@ -300,6 +370,10 @@ func (wa *WavefrontAdapter) Close() error {
if wa.listener != nil {
_ = wa.listener.Close()
}
if wa.reporter != nil {
wa.reporter.Close()
}

return nil
}

Expand All @@ -315,9 +389,9 @@ func NewWavefrontAdapter(addr string) (Server, error) {
}

adapter := &WavefrontAdapter{
listener: listener,
server: grpc.NewServer(),
reporterInitialized: false,
listener: listener,
server: grpc.NewServer(),
reporter: nil,
}
metric.RegisterHandleMetricServiceServer(adapter.server, adapter)
fmt.Printf("listening on \"%v\"\n", adapter.Addr())
Expand Down

0 comments on commit 4177baf

Please sign in to comment.