Skip to content

Commit

Permalink
Merge pull request #6655 from influxdata/nc-http-subs
Browse files Browse the repository at this point in the history
Add HTTP(s) Subscriptions
  • Loading branch information
Nathaniel Cook committed May 19, 2016
2 parents 393fd5b + d460be3 commit 6b1cc64
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [#2926](https://github.com/influxdata/influxdb/issues/2926): Support bound parameters in the parser.
- [#1310](https://github.com/influxdata/influxdb/issues/1310): Add https-private-key option to httpd config.
- [#6621](https://github.com/influxdata/influxdb/pull/6621): Add Holt-Winter forecasting function.
- [#6655](https://github.com/influxdata/influxdb/issues/6655): Add HTTP(s) based subscriptions.

### Bugfixes

Expand Down
4 changes: 4 additions & 0 deletions cmd/influxd/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func (c *Config) Validate() error {
return err
}

if err := c.Subscriber.Validate(); err != nil {
return err
}

for _, g := range c.GraphiteInputs {
if err := g.Validate(); err != nil {
return fmt.Errorf("invalid graphite config: %v", err)
Expand Down
12 changes: 12 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,18 @@ reporting-disabled = false
# https-private-key = ""
max-row-limit = 10000

###
### [subsciber]
###
### Controls the subscriptions, which can be used to fork a copy of all data
### received by the InfluxDB host.
###

[subsciber]
enabled = true
http-timeout = "30s"


###
### [[graphite]]
###
Expand Down
25 changes: 24 additions & 1 deletion services/subscriber/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,35 @@
package subscriber

import (
"errors"
"time"

"github.com/influxdata/influxdb/toml"
)

const (
DefaultHTTPTimeout = 30 * time.Second
)

// Config represents a configuration of the subscriber service.
type Config struct {
// Whether to enable to Subscriber service
Enabled bool `toml:"enabled"`

HTTPTimeout toml.Duration `toml:"http-timeout"`
}

// NewConfig returns a new instance of a subscriber config.
func NewConfig() Config {
return Config{Enabled: true}
return Config{
Enabled: true,
HTTPTimeout: toml.Duration(DefaultHTTPTimeout),
}
}

func (c Config) Validate() error {
if c.HTTPTimeout <= 0 {
return errors.New("http-timeout must be greater than 0")
}
return nil
}
39 changes: 39 additions & 0 deletions services/subscriber/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package subscriber

import (
"time"

"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/coordinator"
)

// HTTP supports writing points over HTTP using the line protocol.
type HTTP struct {
c client.Client
}

// NewHTTP returns a new HTTP points writer with default options.
func NewHTTP(addr string, timeout time.Duration) (*HTTP, error) {
conf := client.HTTPConfig{
Addr: addr,
Timeout: timeout,
}
c, err := client.NewHTTPClient(conf)
if err != nil {
return nil, err
}
return &HTTP{c: c}, nil
}

// WritePoints writes points over HTTP transport.
func (h *HTTP) WritePoints(p *coordinator.WritePointsRequest) (err error) {
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: p.Database,
RetentionPolicy: p.RetentionPolicy,
})
for _, pt := range p.Points {
bp.AddPoint(client.NewPointFrom(pt))
}
err = h.c.Write(bp)
return
}
Loading

0 comments on commit 6b1cc64

Please sign in to comment.