Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add collectd stats #4050

Merged
merged 1 commit into from
Sep 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
With this release InfluxDB is moving to Go 1.5.

### Features
- [#4050](https://github.com/influxdb/influxdb/pull/4050): Add stats to collectd
- [#3771](https://github.com/influxdb/influxdb/pull/3771): Close idle Graphite TCP connections
- [#3755](https://github.com/influxdb/influxdb/issues/3755): Add option to build script. Thanks @fg2it
- [#3863](https://github.com/influxdb/influxdb/pull/3863): Move to Go 1.5
Expand Down
39 changes: 34 additions & 5 deletions services/collectd/service.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package collectd

import (
"expvar"
"fmt"
"log"
"net"
"os"
"strings"
"sync"
"time"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
Expand All @@ -16,6 +19,17 @@ import (

const leaderWaitTimeout = 30 * time.Second

// statistics gathered by the collectd service.
const (
statPointsReceived = "points_rx"
statBytesReceived = "bytes_rx"
statPointsParseFail = "points_parse_fail"
statReadFail = "read_fail"
statBatchesTrasmitted = "batches_tx"
statPointsTransmitted = "points_tx"
statBatchesTransmitFail = "batches_tx_fail"
)

// pointsWriter is an internal interface to make testing easier.
type pointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
Expand All @@ -42,6 +56,9 @@ type Service struct {
batcher *tsdb.PointBatcher
typesdb gollectd.Types
addr net.Addr

// expvar-based stats.
statMap *expvar.Map
}

// NewService returns a new instance of the collectd service.
Expand All @@ -59,6 +76,12 @@ func NewService(c Config) *Service {
func (s *Service) Open() error {
s.Logger.Printf("Starting collectd service")

// Configure expvar monitoring. It's OK to do this even if the service fails to open and
// should be done before any data could arrive for the service.
key := strings.Join([]string{"collectd", s.Config.BindAddress}, ":")
tags := map[string]string{"bind": s.Config.BindAddress}
s.statMap = influxdb.NewStatistics(key, "collectd", tags)

if s.Config.BindAddress == "" {
return fmt.Errorf("bind address is blank")
} else if s.Config.Database == "" {
Expand Down Expand Up @@ -182,10 +205,12 @@ func (s *Service) serve() {

n, _, err := s.ln.ReadFromUDP(buffer)
if err != nil {
s.statMap.Add(statReadFail, 1)
s.Logger.Printf("collectd ReadFromUDP error: %s", err)
continue
}
if n > 0 {
s.statMap.Add(statBytesReceived, int64(n))
s.handleMessage(buffer[:n])
}
}
Expand All @@ -194,6 +219,7 @@ func (s *Service) serve() {
func (s *Service) handleMessage(buffer []byte) {
packets, err := gollectd.Packets(buffer, s.typesdb)
if err != nil {
s.statMap.Add(statPointsParseFail, 1)
s.Logger.Printf("Collectd parse error: %s", err)
return
}
Expand All @@ -202,6 +228,7 @@ func (s *Service) handleMessage(buffer []byte) {
for _, p := range points {
s.batcher.In() <- p
}
s.statMap.Add(statPointsReceived, int64(len(points)))
}
}

Expand All @@ -213,15 +240,17 @@ func (s *Service) writePoints() {
case <-s.stop:
return
case batch := <-s.batcher.Out():
req := &cluster.WritePointsRequest{
if err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: s.Config.Database,
RetentionPolicy: s.Config.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelAny,
Points: batch,
}
if err := s.PointsWriter.WritePoints(req); err != nil {
s.Logger.Printf("failed to write batch: %s", err)
continue
}); err == nil {
s.statMap.Add(statBatchesTrasmitted, 1)
s.statMap.Add(statPointsTransmitted, int64(len(batch)))
} else {
s.Logger.Printf("failed to write point batch to database %q: %s", s.Config.Database, err)
s.statMap.Add(statBatchesTransmitFail, 1)
}
}
}
Expand Down