Skip to content
This repository has been archived by the owner on Oct 11, 2023. It is now read-only.

Commit

Permalink
Merge branch 'force-flush' into 'develop'
Browse files Browse the repository at this point in the history
Force flush

See merge request noc/influxdb/influxdb-relay!27
  • Loading branch information
Lucas SANTONI committed Dec 21, 2018
2 parents ddc6c20 + 94f081d commit ed7f5d8
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 47 deletions.
47 changes: 25 additions & 22 deletions docs/buffering.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,45 @@
# buffering

The relay can be configured to buffer failed requests for HTTP backends. The
intent of this logic is reduce the number of failures during short outages or
The relay can be configured to buffer failed requests for HTTP backends. The
intent of this logic is reduce the number of failures during short outages or
periodic network issues.

> This retry logic is **NOT** sufficient for long periods of downtime as all
> This retry logic is **NOT** sufficient for long periods of downtime as all
> data is buffered in RAM
Buffering has the following configuration options (configured per HTTP backend):

* buffer-size-mb -- An upper limit on how much point data to keep in memory (in
MB)
* max-batch-kb -- A maximum size on the aggregated batches that will be
submitted (in KB)
* max-delay-interval -- the max delay between retry attempts per backend. The
initial retry delay is 500ms and is doubled after every failure.
MB)
* max-batch-kb -- A maximum size on the aggregated batches that will be
submitted (in KB)
* max-delay-interval -- The max delay between retry attempts per backend. The
initial retry delay is 500ms and is doubled after every failure.

If the buffer is full then requests are dropped and an error is logged. If a
If the buffer is full then requests are dropped and an error is logged. If a
requests makes it into the buffer it is retried until success.

Retries are serialized to a single backend. In addition, writes will be
aggregated and batched as long as the body of the request will be less than
`max-batch-kb` If buffered requests succeed then there is no delay between
Retries are serialized to a single backend. In addition, writes will be
aggregated and batched as long as the body of the request will be less than
`max-batch-kb`.If buffered requests succeed then there is no delay between
subsequent attempts.

One can force the retry buffer(s) to be flushed by querying the `/admin/flush`
route. Any data stored in the buffer(s) will be lost.

If the relay stays alive the entire duration of a downed backend server without
filling that server's allocated buffer, and the relay can stay online until the
entire buffer is flushed, it would mean that no operator intervention would be
required to "recover" the data. The data will simply be batched together and
entire buffer is flushed, it would mean that no operator intervention would be
required to "recover" the data. The data will simply be batched together and
written out to the recovered server in the order it was received.

*NOTE*: The limits for buffering are not hard limits on the memory usage of the
application, and there will be additional overhead that would be much more
challenging to account for. The limits listed are just for the amount of point
line protocol (including any added timestamps, if applicable). Factors such as
small incoming batch sizes and a smaller max batch size will increase the
overhead in the buffer. There is also the general application memory overhead
to account for. This means that a machine with 2GB of memory should not have
buffers that sum up to _almost_ 2GB. The buffering feature will only be
activated when at least two InfluxDB backends are configured. In addition
application, and there will be additional overhead that would be much more
challenging to account for. The limits listed are just for the amount of point
line protocol (including any added timestamps, if applicable). Factors such as
small incoming batch sizes and a smaller max batch size will increase the
overhead in the buffer. There is also the general application memory overhead
to account for. This means that a machine with 2GB of memory should not have
buffers that sum up to _almost_ 2GB. The buffering feature will only be
activated when at least two InfluxDB backends are configured. In addition
always at least one backend has to be active for buffering to work.
4 changes: 2 additions & 2 deletions examples/sample_buffered.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
name = "example-http-influxdb"
bind-addr = "127.0.0.1:9096"
output = [
{ name="local-influxdb01", location = "http://127.0.0.1:8086/write", timeout="10s", type="influxdb" buffer-size-mb = 100, max-batch-kb = 50, max-delay-interval = "5s"},
{ name="local-influxdb02", location = "http://127.0.0.1:7086/write", timeout="10s", type="influxdb" buffer-size-mb = 100, max-batch-kb = 50, max-delay-interval = "5s" },
{ name="local-influxdb01", location = "http://127.0.0.1:8086/write", timeout="10s", type="influxdb", buffer-size-mb = 100, max-batch-kb = 50, max-delay-interval = "5s"},
{ name="local-influxdb02", location = "http://127.0.0.1:7086/write", timeout="10s", type="influxdb", buffer-size-mb = 100, max-batch-kb = 50, max-delay-interval = "5s" },
]
# EOF
49 changes: 29 additions & 20 deletions relay/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var (
"/ping": (*HTTP).handlePing,
"/status": (*HTTP).handleStatus,
"/admin": (*HTTP).handleAdmin,
"/admin/flush": (*HTTP).handleFlush,
"/health": (*HTTP).handleHealth,
}

Expand Down Expand Up @@ -316,27 +317,35 @@ type httpBackend struct {
// validateRegexps checks if a request on this backend matches
// all the tag regular expressions for this backend
func (b *httpBackend) validateRegexps(ps models.Points) error {
// For each point
for _, p := range ps {
// Check if the measurement of each point
// matches ALL measurement regular expressions
m := p.Name()
for _, r := range b.measurementRegexps {
if !r.Match(m) {
return errors.New("bad measurement")
}
}
// For each point
for _, p := range ps {
// Check if the measurement of each point
// matches ALL measurement regular expressions
m := p.Name()
for _, r := range b.measurementRegexps {
if !r.Match(m) {
return errors.New("bad measurement")
}
}

// For each tag of each point
for _, t := range p.Tags() {
// Check if each tag of each point
// matches ALL tags regular expressions
for _, r := range b.tagRegexps {
if !r.Match(t.Key) {
return errors.New("bad tag")
}
}
}
}

return nil
}

// For each tag of each point
for _, t := range p.Tags() {
// Check if each tag of each point
// matches ALL tags regular expressions
for _, r := range b.tagRegexps {
if !r.Match(t.Key) {
return errors.New("bad tag")
}
}
}
func (b *httpBackend) getRetryBuffer() *retryBuffer {
if p, ok := b.poster.(*retryBuffer); ok {
return p
}

return nil
Expand Down
22 changes: 22 additions & 0 deletions relay/http_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,28 @@ func (h *HTTP) handleAdmin(w http.ResponseWriter, r *http.Request, _ time.Time)
}
}

func (h *HTTP) handleFlush(w http.ResponseWriter, r *http.Request, start time.Time) {
if h.log {
h.logger.Println("Flushing buffers...")
}

for _, b := range h.backends {
r := b.getRetryBuffer()

if r != nil {
if h.log {
h.logger.Println("Flushing " + b.name)
} else {
h.logger.Println("NOT flushing " + b.name + " (is empty)")
}

r.empty()
}
}

jsonResponse(w, response{http.StatusOK, http.StatusText(http.StatusOK)})
}

func (h *HTTP) handleStandard(w http.ResponseWriter, r *http.Request, start time.Time) {
if r.Method != http.MethodPost {
w.Header().Set("Allow", http.MethodPost)
Expand Down
25 changes: 22 additions & 3 deletions relay/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Operation func() error
// There is no delay between attempts of different operations.
type retryBuffer struct {
buffering int32
flushing int32

initialInterval time.Duration
multiplier time.Duration
Expand Down Expand Up @@ -76,7 +77,7 @@ func (r *retryBuffer) post(buf []byte, query string, auth string, endpoint strin
// batch.wg.Wait()
// We do not wait for the WaitGroup because we don't want
// to leave the connection open
//.The client will receive a 202 which closes the connection and
// The client will receive a 202 which closes the connection and
// invites him to send further requests
return &responseData{StatusCode: http.StatusAccepted}, err
}
Expand All @@ -93,8 +94,19 @@ func (r *retryBuffer) run() {

interval := r.initialInterval
for {
resp, err := r.p.post(buf.Bytes(), batch.query, batch.auth, batch.endpoint)
if err == nil && resp.StatusCode/100 != 5 {
if r.flushing == 1 {
atomic.StoreInt32(&r.buffering, 0)
batch.wg.Done()

if r.list.size == 0 {
atomic.StoreInt32(&r.flushing, 0)
}

break
}

resp, err := r.p.post(buf.Bytes(), batch.query, batch.auth, batch.endpoint)
if err == nil && resp.StatusCode/100 != 5 {
batch.resp = resp
atomic.StoreInt32(&r.buffering, 0)
batch.wg.Done()
Expand Down Expand Up @@ -161,6 +173,13 @@ func (l *bufferList) getStats() map[string]string {
return stats
}

// Empty the buffer to drop any buffered query
// This allows to flush 'impossible' queries which loop infinitely
// without having to restart the whole relay
func (r *retryBuffer) empty() {
atomic.StoreInt32(&r.flushing, 1)
}

// pop will remove and return the first element of the list, blocking if necessary
func (l *bufferList) pop() *batch {
l.cond.L.Lock()
Expand Down

0 comments on commit ed7f5d8

Please sign in to comment.