Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming SDK proposal #2

Closed
rnburn opened this issue Aug 15, 2019 · 9 comments
Closed

Streaming SDK proposal #2

rnburn opened this issue Aug 15, 2019 · 9 comments

Comments

@rnburn
Copy link
Contributor

rnburn commented Aug 15, 2019

Hello Everyone,

I work on the OpenTracing C++ specification and the instrumentations for NGINX and Envoy. Recently, I've been working with LightStep to improve the performance of their C++ tracer. Our efforts to reduce instrumentation cost and improve collection throughput led us to a more efficient and streamlined design that I propose we adopt as one of the SDKs for OpenTelemetry. The key components of the design are 1) we remove intermediate storage on span objects and instead serialize eagerly as methods are called and 2) we use a domain-specific load balancing algorithm built upon non-blocking sockets, vectored-io, and io multiplexing.

Note: This only discusses the design as it releates to tracing. I plan to updated it to include metrics as well as the proposal progresses.

Design

At a high level, the design looks like this

  1. To each span we associate a chain of buffers that represent the span's protobuf serialization and we build up the serialization as methods on the span are called.
  2. When a span is finished, its serialization chain gets moved into a lock-free multiple-producer, single-consumer circular buffer (or discarded if the buffer is full).
  3. A separate recorder thread regularly flushes the serializations in the circular buffer and streams them to multiple endpoints using the HTTP/1.1 chunked encoding (one chunk per span) with domain-specific load balancing that works as follows:
    1. When the recorder flushes, it picks an available endpoint connection and makes a non-blocking vectored-io write system call to the connection's socket with the entire contents of the circular buffer.
    2. If the amount of data written is less than the size of the circular buffer, the recorder binds any remaining partially written span to the connection (to be written later when its socket is available for writing again) and repeates (i) with one of the other available connections.
    3. If no connections are available for writing, it blocks with epoll (or the target platform's equivalent) until one of the connections is available for writing again.

Here's a diagram of the architecture.

LightStep's C++ tracer provides an example implementation of this design. These are the main components

  1. The serialization chain for a span's protobuf serialization.
  2. The span class with no intermediate data storage and eager serialization
  3. The lock-free circular buffer for buffering spans in the recorder.
  4. The streaming recorder with domain-specific load balancing

Rational

Serializing eagerly in (1) instead of storing data in intermediary structures that get serialized later when the span is finished, eliminates unnecessary copying and allows us to avoid small heap allocations. This leads to much lower instrumentation cost. As part of my work on the LightStep tracer, I developed microbenchmarks that compare the eager serialization approach to a more traditional approach that stores data in protobuf-generated classes and serializes later when traces are uploaded. For a span with 10 small key-values attached, I got these measurements for the cost of starting then finishing a span that show much better performance for the eager serialization approach:

-----------------------------------------------------------------------------
Benchmark                                      Time           CPU Iterations
-----------------------------------------------------------------------------
BM_SpanSetTag10/late-serialization           5979 ns       4673 ns     148715
BM_SpanSetTag10/eager-serialization           845 ns        845 ns     833609

Using a lock-free circular buffer in (2) allows the tracer to remain performant in high concurrency scenarios. We've found that mutex-protected span buffering causes signficant contention when multiple threads finish spans concurrently. I benchmarked the creation of 4000 spans partioned evenly across a varying number of threads and got these results for a mutex-protected buffer:

-----------------------------------------------------------------------------
Benchmark                                      Time           CPU Iterations
-----------------------------------------------------------------------------
BM_SpanCreationThreaded/mutex/1 thread   4714869 ns      57876 ns       1000
BM_SpanCreationThreaded/mutex/2 threads  7351604 ns     104345 ns       1000
BM_SpanCreationThreaded/mutex/4 threads  8019629 ns     185328 ns       1000
BM_SpanCreationThreaded/mutex/8 threads  8582282 ns     328126 ns       1000

The mutex-protected buffer shows slower performance when we use more than a single thread. By comparison, lock-free buffering doesn't have any such degradation

-----------------------------------------------------------------------------
Benchmark                                      Time           CPU Iterations
-----------------------------------------------------------------------------
BM_SpanCreationThreaded/stream/1 thread  1120340 ns      55628 ns      11704
BM_SpanCreationThreaded/stream/2 threads 1375883 ns     111743 ns       7542
BM_SpanCreationThreaded/stream/4 threads 1672191 ns     182614 ns       3970
BM_SpanCreationThreaded/stream/8 threads 1007622 ns     265792 ns       2606

By using multiple load-balanced endpoint connections, the transport in (3) allows for spans to be uploaded at a high rate without dropping data. The domain-specific load balancer takes advantage of the property that spans can be routed to any collection endpoint and naturally adapts to back-pressure from the endpoints to route data to where its most capable of being received. Consider what happens as a collection endpoint starts to reach its capacity to process spans:

  1. At the network level, the endpoint ACKs the TCP packets of span data a slower rate.
  2. From the recorder side, the vectored-write system calls for that endpoint will send less data and the socket will block.
  3. The recorder will then naturally send spans to other endpoints until epoll reports that the overloaded endpoint's socket is no longer blocked and it's capable of receiving data.

Dependencies

A goal of the SDK is to have as minimal a set of dependencies as possible. Because the eager serialization approach uses manual serialization code instead of the protobuf-generated classes, we can avoid requiring protobuf as a dependency. Lightstep's current implementation uses libevent and c-ares for portable asynchronous networking and dns resolution -- but those parts of the code could be hid behind an interface in a way that would allow alternative libraries to be used or platform-specific implementations.

Customization Points

Because this approach never generates a SpanData-like structure with accessors, the customization points are different than the traditional exporter approach. The main point of customization point would be the serialization functions where a vendor could provide alternative implementations to write to a different wire format.

Next steps

The LightStep implementation could be adopted into a default OpenTelemetry tracer and SDK that uses the opentelemetry-proto format but provides a customization point for the serialization. This SDK would prioritize efficiency and high-throughput. While it might not be the right choice for all use cases, I would expect the OpenTelemetry C++ API to be flexible enough to support a variety of different impelmentations, so other use cases could be serviced by either an alternative impelmentation of the OpenTelemetry API or a different SDK (if we decide to support more than one).

@tigrannajaryan
Copy link
Member

This is an interesting proposal. A few questions:

  • How is partial serialization implemented in a way that allows reconstructing a full span from the parts? What does each part carry? (I am assuming this is answered by the implementation, I didn't check it, would be good to describe here in this proposal).
  • What happens with parts which are overwritten, i.e. I set an attribute, it gets streamed out and the I reset the value of the same attribute. Does it get streamed out again thus making Span attributes not a map but a multi-map.
  • Backpressure signalling via TCP flow control makes it impossible for the sender to distinguish overloaded servers from bad networks (packet losses). This distinction is typically important for observability reasons. Any thoughts on this?
  • Since this is all about performance have you considered using faster serialization instead of protobufs (e.g. capnproto)?
  • It is typically important to track data losses. Is there any provision for the server to acknowledge received data? I do not remember from the top of my head if chunked encoding allows acknowledging chunks from server to client, if it does then this may be one of the ways, otherwise perhaps use something like WebSockets or HTTP Pipelining.

@rnburn
Copy link
Contributor Author

rnburn commented Aug 15, 2019

How is partial serialization implemented in a way that allows reconstructing a full span from the parts? What does each part carry? (I am assuming this is answered by the implementation, I didn't check it, would be good to describe here in this proposal).

The protobuf encoding allows a lot of flexibility in the ordering of the fields, so for the most part you can write the constituent parts in any order and have them parsed out to correctly form the full message.

For an overwritten field, it serializes the field twice. Protobuf will take the last field encoded:

Normally, an encoded message would never have more than one instance of a non-repeated field. However, parsers are expected to handle the case in which they do. For numeric types and strings, if the same field appears multiple times, the parser accepts the last value it sees.

Backpressure signalling via TCP flow control makes it impossible for the sender to distinguish overloaded servers from bad networks (packet losses). This distinction is typically important for observability reasons. Any thoughts on this?

I'm not sure I understand why this distinction is important. If the collection point is overloaded, it should stop reading data from client connections. From a clients perspective, why does it need to know that the throughput for a socket is limited via bad network or endpoint overloading? Either way, it needs to redirect output to other sockets or start dropping spans if the total capacity is exceeded.

Since this is all about performance have you considered using faster serialization instead of protobufs (e.g. capnproto)?

That would be interesting, but I didn't explore it because it would have required us to expose a new protocol from the backend at LightStep.

@tigrannajaryan
Copy link
Member

tigrannajaryan commented Aug 15, 2019

The protobuf encoding allows a lot of flexibility in the ordering of the fields, so for the most part you can write the constituent parts in any order and have them parsed out to correctly form the full message.

Nice!

For an overwritten field, it serializes the field twice. Protobuf will take the last field encoded:

I had to lookup Protobuf docs for this. There is a nuance for embedded fields:

Normally, an encoded message would never have more than one instance of a non-repeated field. However, parsers are expected to handle the case in which they do. For numeric types and strings, if the same field appears multiple times, the parser accepts the last value it sees. For embedded message fields, the parser merges multiple instances of the same field, as if with the Message::MergeFrom method – that is, all singular scalar fields in the latter instance replace those in the former, singular embedded messages are merged, and repeated fields are concatenated. The effect of these rules is that parsing the concatenation of two encoded messages produces exactly the same result as if you had parsed the two messages separately and merged the resulting objects.

Since Span attributes are scalar values this should be fine. I don't know if this merging logic may cause issues with other fields of the Span, hopefully you have taken care of this.

I'm not sure I understand why this distinction is important. If the collection point is overloaded, it should stop reading data from client connections. From a clients perspective, why does it need to know that the throughput for a socket is limited via bad network or endpoint overloading? Either way, it needs to redirect output to other sockets or start dropping spans if the total capacity is exceeded.

Depends on what the client is. For example OpenTelemetry Service itself is a client. It can send to an upstream Service or to a backend. In production we usually monitor OpenTelemetry Service for drops and need to be able to tell why the drops happen. "Server is overloaded" requires a different action from "Network latency is high" or "Network loses packets".

You are right that the client itself does not need to react differently to these conditions, but people who monitor the client sometimes do need to react differently.

Any thoughts regarding reliability of delivery / acks / etc?

In any case I like the proposal and the initiative. I haven't looked at the implementation but conceptually your description looks good to me.

@rnburn
Copy link
Contributor Author

rnburn commented Aug 16, 2019

Once the http/1.1 streaming session is finished, the server has the opportunity to send a response -- that could be used to indicate overload.

Any thoughts regarding reliability of delivery / acks / etc?

Being over tcp, you'll get a write error on the socket if the packets aren't acked by the destination. Would that be sufficient or are you looking for something more like application level acking of each individual span?

@g-easy
Copy link
Contributor

g-easy commented Aug 16, 2019

faster serialization instead of protobufs

How about language-native structs? I realize this means the LightStep exporter will need to convert that data into protobuf fragments, but not all exporters produce protobufs.

Also, doesn't this add a dependency on protobuf from OpenTelemetry core? As opposed to e.g. just exporters that use protobuf.

@tigrannajaryan
Copy link
Member

@rnburn:

Once the http/1.1 streaming session is finished, the server has the opportunity to send a response -- that could be used to indicate overload.

Sounds good. I'd return HTTP 429 or 503 with optional Retry-After header.

Being over tcp, you'll get a write error on the socket if the packets aren't acked by the destination. Would that be sufficient or are you looking for something more like application level acking of each individual span?

How would the client interpret a socket error? One thing the client can reliably infer from this is that "possibly some data was not delivered". That is not strong enough for my needs. As a person who operates a monitoring system I usually need stronger guarantees. I typically want a guarantee that no data is lost given that there is a reasonable way to prevent the loses or I want to see counters/metrics about how much data was lost if it could not be prevented and why it was lost. This can be better achieved by tracking what is not delivered and re-sending. For this to be feasible the client needs more granular knowledge about what was and what was not delivered. If all we have is one error per stream we will have to re-send the entire stream from the very beginning to guarantee that nothing is lost. This obviously is not practical, we cannot keep a duplicate of the entire stream around just in case the client needs to re-send it.

I am working on a general purpose OpenTelemetry protocol and my proposal uses acks and limited pool of unacknowledged data that is re-sent in certain cases, providing stronger delivery guarantees (at the cost of possible duplicates).

The protocol your propose is more specialized so this may be a non-issue for you (although it would be a no-go for any production environment I am involved with). I'd want to see this specifically called out (particularly, whether reliability is a goal or no). You may also be interested in reading the goals and requirements that I suggested earlier for the general-purpose OpenTelemetry protocol: open-telemetry/opentelemetry-specification#193

@rnburn
Copy link
Contributor Author

rnburn commented Aug 19, 2019

Also, doesn't this add a dependency on protobuf from OpenTelemetry core? As opposed to e.g. just exporters that use protobuf.

It does manual protobuf serialization so it can be done without requiring protobuf as a dependency.

And the plan is to have the serialization functions as a customization point so the SDK could be use with any format that supports the eager serialization approach.

@rnburn
Copy link
Contributor Author

rnburn commented Aug 19, 2019

@tigrannajaryan I think it would be possible to have a mode that supports stronger acking while still keeping many of the benefits by using bidirectional streaming where the collection point can send regular acks back and then the client only removes spans from the buffer once they've been acked.

Chunked http doesn't look like it supports bidrectional streaming (https://stackoverflow.com/a/28309215/4447365), but perhaps a WebSocket would work.

@lalitb
Copy link
Member

lalitb commented Dec 16, 2020

Closing as we do have this proposal finalized and implemented.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants