Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Add initial remote-write-receive component #811

Merged
merged 1 commit into from Mar 12, 2019

Conversation

brancz
Copy link
Member

@brancz brancz commented Feb 5, 2019

Changes

This adds the very first pieces of the proposal to add receiving remote-write support to Thanos. The features are just having a single tsdb that remote write requests are written into and that the component joins a mesh network to enable querying. All other features are still outstanding but can be worked on in parallel once this lands.

Verification

As this is very small and primarily hooks up existing components, the extend of my verification is that I added this new component to the quickstart script and verified that indeed data replicated to the endpoint receiving remote-write calls can be queried via the Thanos querier. Happy to add this to the e2e test suite though.

@bwplotka @domgreen

cc @metalmatze @mxinden @squat @s-urbaniak @ant31

@bwplotka
Copy link
Member

bwplotka commented Feb 5, 2019

Finally <3

First of all, as you can see our CI does not like NOT checked errors (: and both:

fmt.Fprintf	fmt.Fprintf(w, "Service Unavailable")
(*net/http.Server).Shutdown	httpSrv.Shutdown(ctx)

looks like it would be nice at least log them.

@brancz
Copy link
Member Author

brancz commented Feb 5, 2019

Looks like that made CI happy 🙂 .

@brancz
Copy link
Member Author

brancz commented Feb 5, 2019

I think having a simple e2e test added would be good. I'll look into adding one.

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, just super small nits, mostly around style. (:

Nice work!

)

func registerRemoteWriteReceive(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "accept Prometheus remote write API requests (EXPERIMENTAL, this may change drastically without notice)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cmd := app.Command(name, "accept Prometheus remote write API requests (EXPERIMENTAL, this may change drastically without notice)")
cmd := app.Command(name, "Accept Prometheus remote write API requests (EXPERIMENTAL, this may change drastically without notice)")

return nil
},
func(err error) {
level.Debug(logger).Log("msg", "initial load group errored", "err", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I don't get this log messages here and on every group. This code is invoked even though this group did not errof (but e.g other group errored). I think we can remove those debug logs. We will print error for problematic group via g.Run anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this is a leftover from some debugging, I'll remove it.

},
func(err error) {
level.Debug(logger).Log("msg", "tsdb group errored", "err", err)
if err := localStorage.Close(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just checking .. localStorage.Close closes db as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does

func(err error) {
level.Debug(logger).Log("msg", "tsdb group errored", "err", err)
if err := localStorage.Close(); err != nil {
level.Error(logger).Log("msg", "Error stopping storage", "err", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all log lines should start lowercase


level.Debug(logger).Log("msg", "setting up metric http listen-group")
if err := metricHTTPListenGroup(g, logger, reg, httpMetricsBindAddr); err != nil {
level.Debug(logger).Log("msg", "metric listener errored", "err", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replicated error handling. (rule "Handle error just once") (:

I think error wrap is the way to go (and removal of this log line).


var (
m = cmux.New(listener)
httpl = m.Match(cmux.HTTP1Fast())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment what this is for? It might be not straightforward.

return
}

err = h.receiver.Receive(&wreq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do if err := ... ; err != nil as well

package receive

import (
"github.com/go-kit/kit/log"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make format needed (:

@@ -0,0 +1,10 @@
# When the Thanos remote-write-receive component is started,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. can we add this to remote-write-receive command description instead? Might be easier to discover

@@ -136,4 +142,22 @@ do
--cluster.peers 127.0.0.1:19391 &
done

sleep 0.5

if [ -n "${REMOTE_WRITE_ENABLED}" ]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow I did not know that this script still works (:

@bwplotka
Copy link
Member

bwplotka commented Feb 6, 2019

And yea, some tests would be nice. (:


var metadata = &promMetadata{
// Start out with the full time range. The shipper will constrain it later.
// TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this code copied from somewhere? Slightly confusing to see TODO(fabxc) in here, but not a big deal I guess.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha yea was surprised as well at the beginning (: It is copied indeed.

// Options for the web Handler.
type Options struct {
Receiver *ReceiveWriter
Context context.Context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed as it is not used.

Copy link
Member

@metalmatze metalmatze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a few minor things found next to what @bwplotka already discovered.
Overall really good stuff! 👍

// Handler serves various HTTP endpoints of the Prometheus server
type Handler struct {
readyStorage *tsdb.ReadyStorage
context context.Context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed as it is not used.

ReadTimeout: h.options.ReadTimeout,
}

errCh := make(chan error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should be a run.Group.

func (h *Handler) receive(w http.ResponseWriter, req *http.Request) {
defer func() {
if r := recover(); r != nil {
fmt.Println("panic recovered:", r, string(debug.Stack()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we want to have a counter for how many panics were recovered?

Appender() (storage.Appender, error)
}

type ReceiveWriter struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is spelled receive.ReceiveWriter and should be renamed to receive.Writer to be more idiomatic.

@brancz brancz force-pushed the remote-write-receive branch 6 times, most recently from d7765cc to abf0029 Compare March 6, 2019 16:33
@brancz
Copy link
Member Author

brancz commented Mar 6, 2019

@bwplotka @metalmatze I believe I addressed everything or answered the comments and added the receiver to the e2e test setup (the test is that a prometheus is successfully pushing an up metric of a "down" node-exporter the the remote-write-receiver).

Copy link
Contributor

@domgreen domgreen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Looks solid overall just a few nits. 👯‍♂️

Sorry again for taking so long to get around to this :(

@@ -78,6 +78,7 @@ func main() {
registerCompact(cmds, app, "compact")
registerBucket(cmds, app, "bucket")
registerDownsample(cmds, app, "downsample")
registerRemoteWriteReceive(cmds, app, "remote-write-receive")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - would shorten this to just receive/remote /receiver

cancel()
peer.Close(2 * time.Second)
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this whole block? Will receive need to peer via gossip? If we are removing gossip in the near future I think we should not add it to any new components and instead help people migrate to the new approach.

level.Debug(logger).Log("msg", "setting up grpc server")
{
var (
logger = log.With(logger, "component", "receiver")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receiver > receive? or the verbose name? Let's keep it consistant.

Rule = sourceStoreAPI{component: component{name: "rule"}}
Sidecar = sourceStoreAPI{component: component{name: "sidecar"}}
Store = sourceStoreAPI{component: component{name: "store"}}
RemoteWrite = sourceStoreAPI{component: component{name: "remote_write"}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consistency


reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
fmt.Println("snappy decode error")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt > logger?


if [ -n "${REMOTE_WRITE_ENABLED}" ]
then
./thanos remote-write-receive \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this is great taht were adding it here as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either we leave this or remove gossip 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwplotka what do you think on this ... personally, I think we should phase out gossip so (in a different PR) change this file to work via the new approach.
So for this PR I would remove receive from this and once the above work has been done add it back in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have the e2e tests, that works for me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no git-history of this, I'm going to comment this out for now, but leave it. Does that sound like an ok compromise?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with that ... can you make sure a comment about why its commented is included 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me

dataDir string,
peer cluster.Peer,
) error {
level.Info(logger).Log("msg", "setting up receiver")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth adding a warn log here saying that it is currently experimental?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very good idea 👍

var (
requestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "prometheus_http_request_duration_seconds",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want thanos prefix for those metrics? Or do we want to keep it similar to get advantage of common dashboards?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that was a copy paste mistake. Yes lets make it thanos_http_....

return func(w http.ResponseWriter, r *http.Request) {
if h.isReady() {
f(w, r)
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return here, no need for else

ReadTimeout: h.options.ReadTimeout,
}

errCh := make(chan error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still valid?

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still some unaddressed bit, plus some suggestions.. Overall LGTM, but bit major is not versioned remote read receive endpoint, thoughts? (:

IF all will be addressed -> LGTM from me.

Default("./data").String()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
peer, err := newPeerFn(logger, reg, false, "", false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can actualy remove gossip stuff if you want, up to you - making sure deprecated stuff is in new component is quite not necessary. (: Not a blocker

dataDir string,
peer cluster.Peer,
) error {
level.Info(logger).Log("msg", "setting up receive")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's kill this message, below as setting up receive; this component is ...

select {
case <-dbOpen:
break
// In case a shutdown is initiated before the dbOpen is released
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing trailing period in comment (:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I would remove this, I think this is quite clear.

level.Debug(logger).Log("msg", "setting up grpc server")
{
var (
logger = log.With(logger, "component", "receiver")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not doing this in very top?

@@ -204,6 +204,7 @@ github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.1 h1:PZSj/UFNaVp3KxrzHOcS7oyuWA7LoOY/77yCTEFu21U=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/opentracing-contrib/go-stdlib v0.0.0-20170113013457-1de4cc2120e7 h1:8KbikWulLUcMM96hBxjgoo6gTmCkG6HYSDohv/WygYU=
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modules ❤️

}

readyf := h.testReady
router.Post("/receive", readyf(h.receive))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so this starts to be some API. We already keep Prometheus like API to guide people and be intuitive. This means that remote receiver would be nice to use the same or similar... does Prometheus serves remote write endpoint? What endpoint looks like? api/v1/write ? As a basics something like v1 in path is useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tend to agree but flexible on the url ... api/v1/receive feels more correct to me

Copy link
Member Author

@brancz brancz Mar 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed with the prefix, going with /api/v1/receive, no strong opinion though

ReadTimeout: h.options.ReadTimeout,
}

errCh := make(chan error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not addressed

@brancz brancz force-pushed the remote-write-receive branch 8 times, most recently from 1ab4011 to 1a7ea0c Compare March 12, 2019 08:35
@brancz brancz force-pushed the remote-write-receive branch 4 times, most recently from 57c0542 to d459673 Compare March 12, 2019 12:30
@brancz
Copy link
Member Author

brancz commented Mar 12, 2019

I believe I addressed everything:

  • Added run group handling for the http handler
  • Removed gossip
  • Took the freedom to convert the quickstart script from gossip to static --store flag configuration

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, all LGTM from my side.

@@ -17,25 +17,37 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

func regCommonServerFlags(cmd *kingpin.CmdClause) (
func regGRPCFlags(cmd *kingpin.CmdClause) (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hahah I was starring at monitor 1m and thinking why you removed Gossip for everyone in this PR... then seen that actually you added this function.... Github troll =D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants