Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: writers package (previously: Data Race On Stop) #28

Merged
merged 4 commits into from Aug 5, 2019

Conversation

GeorgeMac
Copy link
Contributor

@GeorgeMac GeorgeMac commented Jul 12, 2019

Closes: #39, #31 and #29

This PR now introduces a few fundimental reorganizations of resposibility. Prior to this change:

  1. The (*influxdb.Client).Write handled its own retries + backoff logic
  2. A *influxdb.LPWriter handled buffering and periodic flushing of metrics to a target bucket to (*influxdb.Client).Write.

This PR does the following:

  1. remove retry + backoff functionality from (*influxdb.Client).Write.
  2. Introduces some new types in a package writer which replace the *LPWriter.

The new types are as follows:

  • *BucketWriter decorates a client, an organisation and a bucket and simplifies the Write(org, bucket, ...metric) to a simple Write(...Metric).
  • *BufferedWriter decorates the simpler Writer interface and buffers up writes. When buffer is exceeded it automatically flushes. Otherwise writer.Flush() can be called manually.
  • *PointWriter decorates a buffered writer, makes it safe for concurrent use and ensure Flush is called automatically and periodically. It ensures flush is called even if the buffer is not exceeded after an interval has ellapsed since the last call to flush. The last call to flush could have either been since last period or since buffer exceeding caused a flush.

The following is taken from the doc.go example:

import (
	"github.com/influxdata/influxdb-client-go"
	"github.com/influxdata/influxdb-client-go/writer"
)

func main() {
	cli, _ := influxdb.New("http://localhost:9999", "some-token")

	wr := writer.New(cli, influxdb.Organisation("influx"), influxdb.Bucket("default"), writer.WithBufferSize(10))

	wr.Write(influxdb.NewRowMetric(
		map[string]interface{}{
			"value": 16,
		},
		"temp_celsius",
		map[string]string{
			"room": "living_room",
		},
		time.Now(),
	),
		influxdb.NewRowMetric(
			map[string]interface{}{
				"value": 17,
			},
			"temp_celsius",
			map[string]string{
				"room": "living_room",
			},
			time.Now(),
		))

	wr.Close()
}

Will compose all the writers correctly and start the periodic flushing.

This is all demonstrated in the doc.go:

// Package writer contains useful types for buffering, batching and periodically syncing
// writes onto a provided metric writing client.
//
// The following example demonstrate the usage of a *writer.PointWriter. This is designed to
// buffer calls to Write metrics and flush them in configurable batch sizes (see WithBufferSize).
// It is also designed to periodically flush the buffer if a configurable duration ellapses between
// calls to Write. This is useful to ensure metrics are flushed to the client during a pause in their
// production.
//
// Example Usage
//
// import (
// "github.com/influxdata/influxdb-client-go"
// "github.com/influxdata/influxdb-client-go/writer"
// )
//
// func main() {
// cli, _ := influxdb.New("http://localhost:9999", "some-token")
//
// wr := writer.New(cli, influxdb.Organisation("influx"), influxdb.Bucket("default"), writer.WithBufferSize(10))
//
// wr.Write(influxdb.NewRowMetric(
// map[string]interface{}{
// "value": 16,
// },
// "temp_celsius",
// map[string]string{
// "room": "living_room",
// },
// time.Now(),
// ),
// influxdb.NewRowMetric(
// map[string]interface{}{
// "value": 17,
// },
// "temp_celsius",
// map[string]string{
// "room": "living_room",
// },
// time.Now(),
// ))
//
// wr.Close()
// }
//
// writer.New(...) return a PointerWriter which is composed of multiple other types available in this
// package.
//
// It first wraps the provided client in a *BucketWriter which takes care of ensuring all written metrics
// are called on the underyling client with a specific organisation and bucket. This is not safe for
// concurrent use.
//
// It then wraps this writer in a *BufferedWriter and configures its buffer size accordingly. This type
// implements the buffering of metrics and exposes a flush method. Once the buffer size is exceed flush
// is called automatically. However, Flush() can be called manually on this type. This is also not safe
// for concurrent use.
//
// Finally, it wraps the buffered writer in a *PointsWriter which takes care of ensuring Flush is called
// automatically when it hasn't been called for a configured duration. This final type is safe for concurrent use.

TODO:

  • implement *BucketWriter
  • test *BucketWriter
  • implement *BufferedWriter
  • test *BufferedWriter
  • implement *PointWriter
  • test *PointWriter

Original Issue:

There is a data race on stop channel due to its complete replacement:

close(w.stop)
w.wg.Wait()
w.stopTicker()
w.stop = make(chan struct{})

Two things happen next when Stop is called.

  1. Write doesn't obtain the lock before attempting to checked if stop is closed so we have a race on the reference here. Write could attempt to recv on stop while Stop is replacing with a new channel (I believe this is what the race detector is demonstrating).

  2. The Start flush loop Flush may be busy flushing while Stop is replacing the stop channel and so it may be swapped back to a non-closed one on return:

case <-w.flushChan:
err := w.Flush(context.Background())
if err != nil {
w.onError(err)
}
case <-w.stop:
w.wg.Done()
return

This means that the call to Stop may block forever as this flush loop may not exit and call Done on the waitgroup.

TODO:

  • enable race detector in CI
  • consider redesigning API to not allow Start/Stop but instead use Start/Close and make the buffered writer use once
  • otherwise, rearchitect Stop. Ensure Stop follows Start follow Stop follows Start consistently.

@GeorgeMac GeorgeMac added the bug Something isn't working label Jul 12, 2019
@GeorgeMac GeorgeMac changed the title Data Race On Stop refactor: writers package (previously: Data Race On Stop) Aug 1, 2019
@GeorgeMac GeorgeMac force-pushed the gm/race-on-stop branch 3 times, most recently from aa324b5 to ac25706 Compare August 1, 2019 13:56
writer/bucket.go Outdated Show resolved Hide resolved
@GeorgeMac GeorgeMac force-pushed the gm/race-on-stop branch 2 times, most recently from 7d00f62 to 786d1f2 Compare August 1, 2019 14:46
Copy link
Contributor

@docmerlin docmerlin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like this. What benefit does making Organization a type give us? I'd like to keep things as simple as possible

@GeorgeMac
Copy link
Contributor Author

GeorgeMac commented Aug 2, 2019

@docmerlin my original intent was that it would enforce compile time ordering of org before bucket. I hate when you have a function and two arguments next to each other are the same type 😂 Just begging for someone to read the docs wrong and type them in the wrong order.

When I started this PR it was bucket then org and (not for want of getting into a haystack / needle type debate) I figured it makes more sense from a heirarchy perspective that it goes org > bucket.
Particularly for Write which takes org, bucket and then metric (or measurement). This way the type heirachy descends the same way our domain model does.

But then I realized if I leave this as strings then when you update to the new version of the client, bam, no compile errors and bucket and org have flipped. Breaking all your stuff.
So I wanted to enforce some compile time ordering.

All that said, you can pass these in as strings without the retype and Go will infer and it still compiles so this doesn't fix anything 😂

TL;DR

I swapped org and bucket argument order do we:

  1. put them back in the original order
  2. leave them in this new order, put them back to strings and not worry about breaking things
  3. make org and bucket more complex type like a struct type with a Name field in order to create compile-time order concrete
  4. do nothing

Also open to other ideas.

Copy link
Contributor

@docmerlin docmerlin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to go ahead and approve, but I found a couple minor quibbles, (debug println seems. to have snuck past).

go.mod Outdated Show resolved Hide resolved
e2e_test.go Show resolved Hide resolved
@GeorgeMac GeorgeMac merged commit 23da33b into develop Aug 5, 2019
@GeorgeMac GeorgeMac deleted the gm/race-on-stop branch August 5, 2019 16:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working team/compute
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Retries cause metrics payload to duplicate
3 participants