first stab at a graphite listener #293

this is a RFC. it has some comments, questions and TODO's in the code.

most notably needs some work

+ if len(elements) != 3 {
+ continue // invalid line
+ }
+ val, err := strconv.ParseFloat(elements[1], 64)
+ if err != nil {
+ continue // invalid line
+ }
+ timestamp, err := strconv.ParseUint(elements[2], 10, 32)
+ if err != nil {
+ continue // invalid line
+ }
+ // this doesn't work yet. should i use the types in protocol/types.go ?
+ // they don't seem to match with what's in protocol.pb.go
+ // and the latter seems a little lowlevel and has attributes that don't apply in this context
+ // also, not sure if i still have to reserve a column for time or sequence number, or whether
+ // that will be handled automatically for me
You don't have to set timestamp if you call to WriteSeriesData as the points come in, it'll be assigned automatically. Sequence number will always get assigned automatically.

You should be using the objects from protocol.pb.go. Basically, create a series object, columns, and then points. Create field values for the points. Timestamp and Sequence number should not be their own fields.

This might give you a better idea:

@Dieterbe Dieterbe referenced this pull request in graphite-ng/graphite-api-experiment

merging projects, uniting contributors (if it makes sense) #7


pushed a bunch of fixes, but now my getAuth() doesn't work because (at startup) len(names) == 0.
how should i fix this? shouldn't always at least 1 admin user exist?

// getAuth assures that the user property is a user with access to the graphite database
func (self *Server) getAuth() {
    // just use any (the first) of the list of admins.
    // can we assume there's always at least 1 ?
    names := self.clusterConfig.GetClusterAdmins()
    self.user = self.clusterConfig.GetClusterAdmin(names[0])
I think this is ready for review. I tested it by feeding it data using the diamond agent and that seems to work fine.
2 notes though:

  • still need to implement check if db exists and create if needed (paul you said you guys would do this IIRC)
  • I did see some errors in the output wrt raft/shard logs, which is seemingly unrelated to the graphite listener. see should i open a new ticket for this?
+ } else {
+ values = append(values, &protocol.FieldValue{DoubleValue: &val})
+ }
+ ts := int64(timestamp * 1000000)
+ sn := uint64(1) // use same SN makes sure that we'll only keep the latest value for a given metric_id-timestamp pair
+ point := &protocol.Point{
+ Timestamp: &ts,
+ Values: values,
+ SequenceNumber: &sn,
+ }
+ series := &protocol.Series{
+ Name: &elements[0],
+ Fields: []string{"value"},
+ Points: []*protocol.Point{point},
+ }
+ // little inefficient for now, later we might want to add multiple series in 1 writePoints request
Agreed. #310 will add that functionality to requests, which will make it possible to update the interface on the coordinator. Will definitely make things more efficient.

This looks good to me. We can help with the database validation. I'd like @jvshahid to take a look too.

For the Raft log thing, log an issue with as much info as you can and we'll see if we can repro. Thanks!

@Dieterbe Dieterbe referenced this pull request

raft/shard log errors #311

+func (self *Server) ListenAndServe() {
+ self.getAuth()
+ var err error
+ if self.listen_addr != "" {
+ self.conn, err = net.Listen("tcp", self.listen_addr)
+ if err != nil {
+ log.Error("GraphiteServer: Listen: ", err)
+ return
+ }
+ }
+ self.Serve(self.conn)
+func (self *Server) Serve(listener net.Listener) {
+ // not really sure of the use of this shutdown channel,
I think the shutdown here doesn't do much; or at least, not what you hope.

It works in the HTTP API because the underlying net/http module's Serve() method has some intelligent handling of Accept() failures:

In short it determines if it's a temporary or permanent failure, and it aborts the accept loop on permanent failures. That allows the Serve() method to exit, which ends up triggering the defer to send the true on the shutdown channel.

I haven't fully thought through the implications of shutdown in InfluxDB (just started looking at the code today) so I don't know what the right solution in API modules like this is.

(Thank you for doing this; I've used it as a base for my own OpenTSDB ingester: zorkian@b57b611 ... not done yet, but it's a start and you should recognize the structure!)

it is nice to see some other listeners but wouldn't be better if there was a nicer way to have each plugins have its own config without having it merged like that in the rest of the config file ?

  name = "http_json"
  port = 8086

  name = "udp_whatever"
  port = 6666

  name = "http_graphite"
  port = 67888

instead of the current model which seems to be:

  port = 8086

  port = 67888

This is just an idea but it would help cleaning up the config file and show clearly what is an input plugin and allow users to quickly turn them on and off by commenting the whole block.

another idea:

    port = 8086

    port = 6666
    other_param = 42

    port = 67888

The specific syntax does not matter that much to me, what matters is that the plugins are clearly identified in the config and not only by comments.

I agree, the last example you posted looks best to me.


I am not familiar enough withe codebase but could there be a way to have plugins code cleanly split from the core (and to an extend maybe not compiled at all when not needed) ?

Here is an example of what I meant (again that's not the way things are laid out but more the idea behind it):



I am sure that problem will arise sooner than later as more plugins are added especially if they need specific libraries not needed by the influxdb core, another factor is that once some plugins are out there the "architecture" won't be as easy to change as it could be now.

One of the plugins I want to give a try at is json over udp, when the server writing to influxdb is on the same machine I am sure it could provide a nice boost over http and completely remove its overhead.


Any plans to make this ingest the Graphite pickle protocol as well?

Here would be my use case:

I currently have a relay in every data center that forwards data from every server to a data center local graphite instance and a centrally hosted instance as well. So I have one point, where I have all data center specific metrics, but I also have a global view. Problem is that the global instance is getting under water more and more.
What I plan to do is relay data to a global InfluxDB cluster as well and evaluate if it is something I would bet on for full production use (it does sound very promising, but I need some test data).

So what do you think? Maybe there are there better options for me?


I'm not a big fan of the pickle protocol. seems like it got added to some of the python daemons because somebody thought it would be cool but IMHO in reality it doesn't help with anything, is hard to debug, and breaks compatibility between different services.. tools that only support pickle protocol should support the plaintext protocol and in my view everything should just use the plaintext protocol. (or if you want to optimize, something truly cross-language such as msgpack, but that's a whole other story)


I am so with you!

But I think my current setup is actually a common one and it might be something to consider to help people ease into InfluxDB.

I am not sure if there is something similar like the carbon relay for Influx. For me, this is pretty vital, because I do not want every server to be able to directly talk with InfluxDB and do not want to open up a port to the outside world for it. A central relay that could handle this would be cool. But I guess that does not fit into this PR :)

Thanks @Dieterbe. The pull request is merged in. I made some changes and added a test.


cool. note that there's one thing still left todo

~/w/e/influxdb ❯❯❯ ack TODO src/api/graphite/api.go
// TODO: check that database exists and create it if not

(paul said this would be easy for you guys to add and i didn't have to worry about it :))


btw @Frusty here's a relay that talks the plaintext protocol. I'll be using it to send metrics to both carbon and influx.

@Frusty the pickle protocol is too complex to implement reasonably in another language because the pickle itself is tightly tied to the implementation of python.

You're much better off transporting with the line protocol for now.

  1. +6 −0 config.toml.sample
  2. +171 −0 src/api/graphite/api.go
  3. +21 −0 src/configuration/configuration.go
  4. +12 −0 src/server/server.go
6 config.toml.sample
@@ -23,6 +23,12 @@ port = 8086 # binding is disabled if the port isn't set
# ssl-port = 8084 # Ssl support is enabled if you set a port and cert
# ssl-cert = /path/to/cert.pem
+# optionally enable a graphite (carbon) compatible ingestion
+enabled = false
+port = 2003
+database = "" # store graphite data in this database
# Raft configuration
# The raft port should be open between all servers in a cluster.
171 src/api/graphite/api.go
@@ -0,0 +1,171 @@
+// package Graphite provides a tcp listener that you can use to ingest metrics into influxdb
+// via the graphite protocol.
+// it behaves as a carbon daemon, except:
+// no rounding of timestamps to the nearest interval. Upon ingestion of multiple datapoints
+// for a given key within the same interval (possibly but not necessarily the same timestamp),
+// graphite would use one (the latest received) value
+// with a rounded timestamp representing that interval.
+// We store values for every timestamp we receive (only the latest value for a given metric-timestamp pair)
+// so it's up to the user to feed the data in proper intervals
+// (and use round intervals if you plan to rely on that)
+package graphite
+import (
+ "bufio"
+ "cluster"
+ log ""
+ . "common"
+ "coordinator"
+ "io"
+ "net"
+ "protocol"
+ "strconv"
+ "strings"
+ "time"
+type Server struct {
+ listen_addr string
+ database string
+ coordinator coordinator.Coordinator
+ clusterConfig *cluster.ClusterConfiguration
+ conn net.Listener
+ user *cluster.ClusterAdmin
+ shutdown chan bool
+type GraphiteListener interface {
+ Close()
+ getAuth()
+ ListenAndServe()
+ writePoints(protocol.Series) error
+// TODO: check that database exists and create it if not
+func NewServer(listen_addr, database string, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server {
+ self := &Server{}
+ self.listen_addr = listen_addr
+ self.database = database
+ self.coordinator = coord
+ self.shutdown = make(chan bool, 1)
+ self.clusterConfig = clusterConfig
+ return self
+// getAuth assures that the user property is a user with access to the graphite database
+// only call this function after everything (i.e. Raft) is initialized, so that there's at least 1 admin user
+func (self *Server) getAuth() {
+ // just use any (the first) of the list of admins.
+ names := self.clusterConfig.GetClusterAdmins()
+ self.user = self.clusterConfig.GetClusterAdmin(names[0])
+func (self *Server) ListenAndServe() {
+ self.getAuth()
+ var err error
+ if self.listen_addr != "" {
+ self.conn, err = net.Listen("tcp", self.listen_addr)
+ if err != nil {
+ log.Error("GraphiteServer: Listen: ", err)
+ return
+ }
+ }
+ self.Serve(self.conn)
+ // as all handling is done through goroutines. maybe we should use a waitgroup
+ defer func() { self.shutdown <- true }()
+ for {
+ conn_in, err := listener.Accept()
+ if err != nil {
+ log.Error("GraphiteServer: Accept: ", err)
+ continue
+ }
+ go self.handleClient(conn_in)
+ }
+func (self *Server) Close() {
+ if self.conn != nil {
+ log.Info("GraphiteServer: Closing graphite server")
+ self.conn.Close()
+ log.Info("GraphiteServer: Waiting for all graphite requests to finish before killing the process")
+ select {
+ case <-time.After(time.Second * 5):
+ log.Error("GraphiteServer: There seems to be a hanging graphite request. Closing anyway")
+ case <-self.shutdown:
+ }
+ }
+func (self *Server) writePoints(series *protocol.Series) error {
+ err := self.coordinator.WriteSeriesData(self.user, self.database, series)
+ if err != nil {
+ switch err.(type) {
+ case AuthorizationError:
+ // user information got stale, get a fresh one (this should happen rarely)
+ self.getAuth()
+ err = self.coordinator.WriteSeriesData(self.user, self.database, series)
+ if err != nil {
+ log.Warn("GraphiteServer: failed to write series after getting new auth: %s\n", err.Error())
+ }
+ default:
+ log.Warn("GraphiteServer: failed write series: %s\n", err.Error())
+ }
+ }
+ return err
+func (self *Server) handleClient(conn_in net.Conn) {
+ defer conn_in.Close()
+ reader := bufio.NewReader(conn_in)
+ for {
+ buf, err := reader.ReadBytes('\n')
+ if err != nil {
+ str := strings.TrimSpace(string(buf))
+ if err != io.EOF {
+ log.Warn("GraphiteServer: connection closed uncleanly/broken: %s\n", err.Error())
+ }
+ if len(str) > 0 {
+ log.Warn("GraphiteServer: incomplete read, line read: '%s'. neglecting line because connection closed because of %s\n", str, err.Error())
+ }
+ return
+ }
+ str := strings.TrimSpace(string(buf))
+ elements := strings.Split(str, " ")
+ if len(elements) != 3 {
+ continue // invalid line
+ }
+ val, err := strconv.ParseFloat(elements[1], 64)
+ if err != nil {
+ continue // invalid line
+ }
+ timestamp, err := strconv.ParseUint(elements[2], 10, 32)
+ if err != nil {
+ continue // invalid line
+ }
+ values := []*protocol.FieldValue{}
+ if i := int64(val); float64(i) == val {
+ values = append(values, &protocol.FieldValue{Int64Value: &i})
+ } else {
+ values = append(values, &protocol.FieldValue{DoubleValue: &val})
+ }
+ ts := int64(timestamp * 1000000)
+ sn := uint64(1) // use same SN makes sure that we'll only keep the latest value for a given metric_id-timestamp pair
+ point := &protocol.Point{
+ Timestamp: &ts,
+ Values: values,
+ SequenceNumber: &sn,
+ }
+ series := &protocol.Series{
+ Name: &elements[0],
+ Fields: []string{"value"},
+ Points: []*protocol.Point{point},
+ }
+ // little inefficient for now, later we might want to add multiple series in 1 writePoints request
+ self.writePoints(series)
+ }
21 src/configuration/configuration.go
@@ -33,6 +33,12 @@ type ApiConfig struct {
Port int
+type GraphiteConfig struct {
+ Enabled bool
+ Port int
+ Database string
type RaftConfig struct {
Port int
Dir string
@@ -125,6 +131,7 @@ type WalConfig struct {
type TomlConfiguration struct {
Admin AdminConfig
Api ApiConfig
+ Graphite GraphiteConfig
Raft RaftConfig
Storage StorageConfig
Cluster ClusterConfig
@@ -142,6 +149,9 @@ type Configuration struct {
ApiHttpSslPort int
ApiHttpCertPath string
ApiHttpPort int
+ GraphiteEnabled bool
+ GraphitePort int
+ GraphiteDatabase string
RaftServerPort int
SeedServers []string
DataDir string
@@ -214,6 +224,9 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
ApiHttpPort: tomlConfiguration.Api.Port,
ApiHttpCertPath: tomlConfiguration.Api.SslCertPath,
ApiHttpSslPort: tomlConfiguration.Api.SslPort,
+ GraphiteEnabled: tomlConfiguration.Graphite.Enabled,
+ GraphitePort: tomlConfiguration.Graphite.Port,
+ GraphiteDatabase: tomlConfiguration.Graphite.Database,
RaftServerPort: tomlConfiguration.Raft.Port,
RaftDir: tomlConfiguration.Raft.Dir,
ProtobufPort: tomlConfiguration.Cluster.ProtobufPort,
@@ -292,6 +305,14 @@ func (self *Configuration) ApiHttpSslPortString() string {
return fmt.Sprintf("%s:%d", self.BindAddress, self.ApiHttpSslPort)
+func (self *Configuration) GraphitePortString() string {
+ if self.GraphitePort <= 0 {
+ return ""
+ }
+ return fmt.Sprintf("%s:%d", self.BindAddress, self.GraphitePort)
func (self *Configuration) ProtobufPortString() string {
return fmt.Sprintf("%s:%d", self.BindAddress, self.ProtobufPort)
12 src/server/server.go
@@ -2,6 +2,7 @@ package server
import (
+ "api/graphite"
log ""
@@ -16,6 +17,7 @@ type Server struct {
ProtobufServer *coordinator.ProtobufServer
ClusterConfig *cluster.ClusterConfiguration
HttpApi *http.HttpServer
+ GraphiteApi *graphite.Server
AdminServer *admin.HttpServer
Coordinator coordinator.Coordinator
Config *configuration.Configuration
@@ -53,6 +55,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.AdminAssetsDir, coord, coord, clusterConfig, raftServer)
httpApi.EnableSsl(config.ApiHttpSslPortString(), config.ApiHttpCertPath)
+ graphiteApi := graphite.NewServer(config.GraphitePortString(), config.GraphiteDatabase, coord, clusterConfig)
adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString())
return &Server{
@@ -60,6 +63,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
ProtobufServer: protobufServer,
ClusterConfig: clusterConfig,
HttpApi: httpApi,
+ GraphiteApi: graphiteApi,
Coordinator: coord,
AdminServer: adminServer,
Config: config,
@@ -93,6 +97,14 @@ func (self *Server) ListenAndServe() error {
log.Info("Starting admin interface on port %d", self.Config.AdminHttpPort)
go self.AdminServer.ListenAndServe()
+ if self.Config.GraphiteEnabled {
+ if self.Config.GraphitePort <= 0 || self.Config.GraphiteDatabase == "" {
+ log.Warn("Cannot start graphite server. please check your configuration")
+ } else {
+ log.Info("Starting Graphite Listener on port %d", self.Config.GraphitePort)
+ go self.GraphiteApi.ListenAndServe()
+ }
+ }
log.Info("Starting Http Api server on port %d", self.Config.ApiHttpPort)
return nil
Something went wrong with that request. Please try again.