-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce a generic gRPC span sink #386
Conversation
a0a17cd
to
538d136
Compare
sinks/grpsink/grpc.go
Outdated
) | ||
|
||
// GRPCStreamingSpanSink is a sink that streams spans to a configurable target | ||
// service over gRPC. The sink is only tied to the grpc_sink.proto defintion of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defintion
One thing i've not quite been able to figure out - my |
9e12bc0
to
a7f54b0
Compare
OK, finally fixed those silly errors, tests now passing. r? @stripe/observability |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice start! I have a few comments / questions about structure (:
sinks/grpsink/grpc_stream.go
Outdated
|
||
// Name returns this sink's name. | ||
func (gs *GRPCStreamingSpanSink) Name() string { | ||
return gs.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, we use the Name
a ton in metric tags, which means that every grpc sink gets its own timeseries; I guess this is one of those blessing/curse things, but we should definitely think through if we want to time things by grpc sink implementation, or by sink target (currently, we measure only by sink implementation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought was that it seems unlikely that the grpc sink gets used directly - instead, i can imagine e.g. a very lightweight falconer sink that just wraps this one. That way we'd still have stability on this property, even though it isn't hardwired into the struct.
Alternatively, we could enforce that things work that way by dropping the Name()
method entirely, ,causing GRPCStreamingSpanSink
to not fulfill the interface anymore, and requiring some other struct (e.g. falconer) to embed it in order to have it actually be useful. That smacks of inheritance, but not so much that it wouldn't be idiomatic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, alternative: what if the expectation is that a value in a configuration file is what sets the actual name, here?
sinks/grpsink/grpc_stream.go
Outdated
if err != nil { | ||
gs.log.WithError(err).WithFields(logrus.Fields{ | ||
"name": gs.name, | ||
}).Warn("Error streaming traces to gRPC server") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increment an error counter here or track a metric right away?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we doing log.WithError.Warn
instead of log.WithError.Error
? The latter would automatically get us a Sentry event.
sinks/grpsink/grpc_stream.go
Outdated
|
||
func (gs *GRPCStreamingSpanSink) Start(cl *trace.Client) error { | ||
// TODO this API is experimental - perhaps we shouldn't use it yet | ||
if !gs.grpcConn.WaitForStateChange(context.TODO(), connectivity.Ready) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm, maybe we should pass a ctx
arg into Start methods too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i definitely think we should, but i didn't want to expand scope on this PR to modifying the interface.
} | ||
|
||
// Ingest takes in a span and streams it over gRPC to the connected server. | ||
func (gs *GRPCStreamingSpanSink) Ingest(ssfSpan *ssf.SSFSpan) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely ought to start passing a ctx
into Ingest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 💯 💯
sinks/grpsink/grpc_stream.go
Outdated
} | ||
|
||
// TODO validation of span, e.g. time bounds like in datadog span sink? | ||
err := gs.stream.Send(ssfSpan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this deal with disconnections? Will sending to a dead socket re-establish the connection? (A cursory glance at the grpc docs didn't tell me)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't know yet. there's a whole lot of connection handling things that i just haven't explored.
This question, at least, is one that we can explore via tests.
sinks/grpsink/grpc_stream.go
Outdated
} | ||
|
||
// TODO How do we allow grpc.CallOption to be injected in here? | ||
// TODO Setting this up here (probably) means one stream per lifecycle of this object. Is that the right design? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that TODO sums it up: We don't restart veneur all that often, and Start
gets called only on startup; if the connection should drop, does that mean we have no way of re-establishing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i shall investigate 😄
(testing) |
f0be0df
to
b36a560
Compare
Had to fix the CHANGELOG, it adds the 4.0.0 section at the top. Maybe this isn't ideal, but we can sort that out... Anyway, this is actually ready for review - @asf-stripe ? The only two files that ACTUALLY need review are |
Note that we've elected NOT to expose config controls in this PR, so this code isn't actually reachable yet by users. (Maybe that means it's premature to add it to the CHANGELOG...?) |
Uuuugh, so, we've got some potentially flaky tests here - a lot of dependence on sleep timings, because there's not really good ways of peeking into internal grpc state. Not actually sure how avoidable this is going to be on shared infra like Travis... 😢 |
Agh also, unrelated note: I realized why this is not building in our internal CI: You have to make that branch in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, yeah, I don't love the tests either, and I'm somewhat confused around what locks do and don't. I think since this code takes the lock at a bunch of times and does not super easy to grok things with it, maybe this can be structured in a different way? (Is there a reasonable way to have a single goroutine manage the connection?)
sinks/grpsink/grpc_stream_test.go
Outdated
|
||
err = sink.Ingest(testSpan) | ||
// This should be enough to make it through loopback TCP. Bump up if flaky. | ||
time.Sleep(time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow, yeah, I really don't love this; can this maybe notify a channel or invoke a callback or something instead of appending to that spans array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have some WIP locally that i'd abandoned, but it at least achieves that bit. that's easy enough to pull up 👍
sinks/grpsink/grpc_stream_test.go
Outdated
assert.Equal(t, testSpan, mock.firstSpan()) | ||
require.Equal(t, mock.spanCount(), 1) | ||
|
||
srv.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm (recalling our vc before), did it not work to close the connection out-of-band?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i spent a bit trying to make it work, but it ended up being even more complicated wrt adding more icky timing dependencies. then, i looked at the actual implementation of Server.Stop()
, and saw that the first really effectual action it takes is just closing the listener. So i decided that testing this directly wasn't worth the extra trouble. (sorry, forgot to mention it in issue comments!)
sinks/grpsink/grpc_stream_test.go
Outdated
cf() | ||
|
||
require.Equal(t, connectivity.Ready, sink.grpcConn.GetState()) | ||
time.Sleep(5 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same for the ingestion channel suggestion above
sinks/grpsink/grpc_stream_test.go
Outdated
// channel, but this test ensures we handle it properly in the event that it | ||
// does. It can be flaky, though (it's too dependent on sleep timings), so | ||
// it's disabled, but preserved for future debugging purposes. | ||
func testClientIdleRecovery(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it intentional that this function doesn't get invoked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, per the comment above it
sinks/sinks.go
Outdated
@@ -56,6 +56,11 @@ const MetricKeySpanFlushDuration = "sink.span_flush_total_duration_ns" | |||
// place to do this. | |||
const MetricKeyTotalSpansFlushed = "sink.spans_flushed_total" | |||
|
|||
// MetricKeyTotalSpansDropped should be emitted as a counter by a SpanSink | |||
// if possible. Tagged with `sink:sink.Name()`. The `Flush` function is a great | |||
// place to do this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this doc comment doesn't quite answer the thing I most hoped it would: What concrete thing does that metric indicate? (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was following the pattern from above :) i'll flesh it out.
sinks/grpsink/grpc_stream.go
Outdated
logrus.ErrorKey: err, | ||
}).Error("Failed to set up a new stream after gRPC channel recovered") | ||
time.Sleep(1 * time.Second) | ||
gs.streamMut.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it it necessary to hold the lock while sleeping there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, wonder if instead of sleeping this can use WaitForStateChange
with a longer timeout instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah no, those can be switched around - unlock before sleep.
We don't want another WaitForStateChange()
here, though, because we shouldn't expect to see one. The state change is in the underlying channel state, not the stream - and being here means that the channel is already back in READY state. If we were to wait again, it might be for a long time - until the next network event disrupts the connection.
if err := protocol.ValidateTrace(ssfSpan); err != nil { | ||
return err | ||
} | ||
// TODO(sdboyer) validation of span, e.g. time bounds like in datadog sink? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean by that TODO? (ValidateTrace
should take care of identifying only trace spans...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
veneur/sinks/datadog/datadog.go
Lines 334 to 342 in a634bb6
if ssfSpan.StartTimestamp < tooEarly { | |
tags["type"] = "tooEarly" | |
} | |
if ssfSpan.StartTimestamp > tooLate { | |
tags["type"] = "tooLate" | |
} | |
if timeErr != "" { | |
samples.Add(ssf.Count("worker.trace.sink.timestamp_error", 1, tags)) | |
} |
sinks/grpsink/grpc_stream.go
Outdated
return err | ||
} | ||
|
||
// Flush reports metrics about the number of failed and successful |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc comment sentence breaks off (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oooooops
sinks/grpsink/grpc_stream.go
Outdated
sinks.MetricKeyTotalSpansDropped, | ||
float32(atomic.LoadUint32(&gs.errCount)), | ||
map[string]string{"sink": gs.Name()}, | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can pass varargs to samples.Add
- probably easier to skim that way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
sinks/grpsink/grpc_stream.go
Outdated
map[string]string{"sink": gs.Name()}, | ||
)) | ||
|
||
metrics.Report(gs.traceClient, samples) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been doing defer metrics.Report(gs.traceClient, samples)
throughout the code - guess this is fine too (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this is just my thing about defer
s again :)
There really isn't, unfortunately. We need a self-contained bundle of logic where its sole job is watching the state of the underlying channel in the background, then recreating the stream when it comes back into a good state. That's what's difficult here - while gRPC handles keeping the "channel" (basically, the connection) working automatically, we still have to keep our stream working, as it dies whenever the connection itself has a problem. Gotta have its own goroutine for that. Or at least, i haven't been able to figure out an alternative that isn't more complicated. |
i've added some more commits that should remove the test flakiness entirely (no more sleeps to get timings right) - about 200 or so successful runs in a row on my laptop. (we'll see how it fares in CI, ofc). Also simplified a few things, and added a bit more in the way of explanatory documentation. i think this should be pretty much ready. |
89175a1
to
283ab27
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No final judgment yet! The tests look good (\o/ yay less raciness), I have found a few nits though (:
sinks/grpsink/grpc_stream.go
Outdated
continue | ||
} | ||
// CAS inside the mutex means that it's impossible for for the same |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
miiiinor nit: for for
sinks/grpsink/state_helper_test19.go
Outdated
t.Helper() | ||
|
||
first, current := seq[0], sink.grpcConn.GetState() | ||
if !assert.ObjectsAreEqual(first, current) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this smells super strongly like it wants to be require.Equal
(:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment added - it doesn't use require.Equal
because then the use of t.Helper()
above would be pointless, because those libraries haven't yet been updated to use t.Helper()
.
sinks/grpsink/state_helper_test19.go
Outdated
waitThroughStateSequence(t, sink, dur, seq[:len(seq)-1]...) | ||
|
||
last, current := seq[len(seq)-1], sink.grpcConn.GetState() | ||
if !assert.ObjectsAreEqual(last, current) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for require.Equal
switch state { | ||
case connectivity.Ready: | ||
// Spin on the internal state marker that indicates the stream state is bad | ||
for !atomic.CompareAndSwapUint32(&sink.bad, 0, 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this check that the context timeout has been exceeded too? (Otherwise it'll spin forever/until the test timeout)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(now done, but github didn't notice)
Needed for some (experimental) features currently accessed in grpsink. Plus, might as well upgrade anyway.
This is a rudimentary but effective test of the sink's essential behavior.
All of my efforts at simulating failure modes thus far have either resulted in deadlocks, or error messages. Honestly not sure how one is supposed to simulate a failure, even after looking at grpc's own tests in this area.
This logic handles adaptive stream recreation on a background goroutine, which operates by keeping tabs on the underlying gRPC connection state machine.
This now simulates a server shutdown effectively, demonstrating the automated stream reconstruction capabilities of the sink's background goroutine.
We now handle idle connections correctly, in the very remote case that they should occur.
This should obviate the possibility of flaky failures in these tests. Hopefully. 200 test runs in a row without a failure, so here's hoping.
Previous code wasn't treating io.EOF as a special value in the way that gRPC expects it to be. This simplifies the error handling a bit, as well as alllowing us to continue to log potentially interesting non-EOF errors after the first error has been seen on a given stream instance.
Avoid a possible, albeit highly unlikely, spinlock.
Separating this call into its own method helps clarify its role on the critical path, as well as the relationship to the state machine manager.
fcab67d
to
6b34cb3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
\o/
WEEEEE!!! |
Summary
This introduces a generic gRPC sink for spans.
Motivation
It'll allow us to use, say, Falconer as a sink target. And arbitrary "other things" (as yet undefined) that may want to be span sinks.
Test plan
Automated end-to-end tests of service creation and ingesting a span.
Rollout/monitoring/revert plan
Strictly an addition, so no changes are required for rollouts.