-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor how results are returned to the client #8477
Conversation
ca58b8d
to
94cfe05
Compare
This PR is mostly completed, but still a long way to final merge. There are a lot of changes in here and it isn't the cleanest PR. A decent amount of file renaming has to happen. This PR is described by this gist and before we merge, this gist should be modified to act as internal documentation for the format (and then used to make developer facing documentation that isn't as coarse). I also want to start discussion between our various client library implementors to get some feedback on if this is a decent idea or if there are other concerns that they have experienced when implementing their own libraries. The motivation behind this have mostly been my own struggles so I'm interested to hear from @dmke and @aviau. /cc @jackzampolin for feedback on how this might help developers who use InfluxDB or to voice any concerns or improvements that could be made before we would merge this. This PR will likely be used instead of #7154 since it is more complete and solves more problems. Thanks everyone. |
I will look into this later today. Thanks for the heads up! |
94cfe05
to
be29ff1
Compare
Sorry for the delay, my laptop fan blew and I need to wait for replacement parts. Bad timing... Anyhow, here are my initial thoughts: At a first glance, the proposed response format change sounds reasonable. I'm not sure yet what the performance implications on the client side will be. Yes, we'd potentially need (far) less memory, but:
The last point might be just a problem with the JSON stdlib. I don't know how, for example MsgPack or other streaming-enabled libraries would behave in these situations. I will setup a benchmark when I've replaced my fan and report back (might be next week at the earliest though). |
Thanks for the feedback. MessagePack should also get some of the benefits of being a C library since I believe the MessagePack implementation in Ruby is built using C. I'm not sure about JRuby though. I created a small sample library in Ruby along the lines of what I was thinking of for a proof of concept that this would be parseable in other languages. Do you want me to push it to a repository on GitHub so you can see the rough outline of the code? It might also help with profiling. |
be29ff1
to
74e52d9
Compare
Indeed, msgpack seems to be JRuby compatible and supports streaming deserialization.
That would be nice to have, maybe as (orphaned) branch in influxdata/influxdb-ruby? |
@dmke I've just put the demo here: https://github.com/jsternberg/influxdb-ruby |
The implementation for the cursor is in here: https://github.com/jsternberg/influxdb-ruby/blob/master/lib/influxdb/cursor.rb |
I'm going to be making one slight modification to the spec format above (and I'll edit the gist). The first is that the spec example didn't match the actual text in one area. I've updated that. The second is I would like some way in the format for a reader to know if they are reading the final chunk of a section at the time they are reading the number. I believe we have this information on the server (we can't modify that because the old structure needs it). The reason I want to make this change is I want to make it possible for a cursor to be able to give a length hint similar to here: https://github.com/influxdata/influxdb-client/blob/2d000abc81dd6588d94ee5578810e9fc64bec5ab/cursor.go#L52 That way, if you want to make an array out of the iterative results, you get some help with knowing what memory you have to allocate ahead of time. To see the differences, please look at the revisions to the gist. If anybody has any better ideas (or think this is confusing/shouldn't be done/any other concerns), please bring them up. Thanks. |
afa80fc
to
6a0f6c4
Compare
(FYI: two different suppliers were not able to send me the correct replacment fan. I'm hoping to get the correct one from China now, but the shipment will take another 2-4 weeks or so... 😭) |
9447b0d
to
452daa1
Compare
I thought of another idea I want to add to this as a part of the new format. It might be nice to have the ability to return information for a query after the query has ended instead of having the entire thing be in the header. The current format doesn't really differentiate between these two because it doesn't need to. Right now, the footer would be empty, but it might be useful for meta information that is only known after the query is finished in the future. One idea I was thinking of was building some profiling into the query engine. Right now, we have very little insight into how long individual portions of the query takes without using pprof and, even then, it can be hard to diagnose noise in the cpuprofile and get it correct. Building in timing for the different stages of the query that gets returned at the end of the response would do a lot of good for understanding these issues. Of course, that infrastructure hasn't been built yet, but having a way to return that data is important to get into the format now so it can be added without breaking backwards compatibility later. |
3e17fd5
to
23b5d51
Compare
services/httpd/msgp_emitter.go
Outdated
if series.Err != nil { | ||
err := series.Err.Error() | ||
header := SeriesError{Error: err} | ||
header.EncodeMsg(enc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client has no way to know whether it should be decoding a SeriesError
or a SeriesHeader
(line 89).
This is also inconsistent with result set errors, as there the error is embedded within ResultHeader
. The ResultHeader.Error
design seems to be the better way to go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I do this is because of a limitation with tinylib/msgp
. The library doesn't take kindly to not encoding certain arguments and will instead encode a blank string with them instead of just omitting the entry. So I use two structures that are supposed to be one structure to trick msgp into generating the correct code.
The intention is that the error
key is part of the SeriesHeader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I do this is because of a limitation with tinylib/msgp.
Should we let limitations of the library dictate the protocol? (other message pack libraries don't have this issue, and support the JSON-esque omitempty
)
The intention is that the error key is part of the SeriesHeader.
It would need to be encoded as a map entry, or as a item in an array. Here it's just a standalone object which can show up unexpectedly on the wire.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limitation of the library doesn't dictate the protocol. The protocol has the error
key in the series header. In this PR, I separate the structs so that the proper output is written to the wire. The protocol states that the series header is written and the series header contains an optional error key. If the error key is set, there was an error.
The gist describes the full protocol. I should update this PR to add the protocol as part of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, ok, I see what you're doing now. Using 2 different struct
s with the intention that the client decodes them into the same one. Sorry, didn't realize that until now.
services/httpd/msgp_emitter.go
Outdated
for series := range result.SeriesCh() { | ||
enc.WriteArrayHeader(2) | ||
enc.WriteInt(1) | ||
enc.WriteBool(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This little structure seems like it's used before each series, and then at the end to indicate no-more-series. The integer seems to indicate the number of records that follow, and the bool is for the no-more-series indicator.
Assuming this to be true, aside from no-more-series when the integer value is irrelevant, when will the integer ever be non-1? We only ever have a single SeriesHeader
.
The other question is why an array if it's of fixed length? Why not just write out an integer (assuming there is a reason to have it), and a bool without the array?
The last question is why not use a defined type and let the code generator handle encoding, such as a struct with the msgp:tuple
flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for encoding it as an array is so that it's a single thing that can be decoded.
The reason why it doesn't use msgp:tuple
is because I was unaware that existed. Thanks.
services/httpd/msgp_emitter.go
Outdated
enc := msgp.NewWriter(w) | ||
enc.WriteMapHeader(1) | ||
enc.WriteString("results") | ||
enc.WriteInt(header.Results) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we have a header which is the count of result sets to follow. But on each series we send an indicator when we've reached the last one, and then we have a new "row chunk" which also uses an indicator (line 95, 114, & 131), and then on rows we're back to a count header (line 94, 113 & 130). It would be nice if this were consistent.
If this is meant to be a streaming protocol, I would propose that we do away with all the count headers, and use EOF type signals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean by EOF signals. There is a gist in the second comment of the pull request that describes the rationale for the format. And yes, the series count is a bit redundant. The main reason why I didn't choose a different format is I thought parity with the row count was nice and less confusing for those trying to read the format. The series continuation could be a true/false boolean by itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 ways the client knows when to stop reading a sequence of objects. Either the client knows how many objects to read, or it receives something which tells it to stop.
But my point is that having an indicator when to stop reading works for everything. Having a count that tells the client how many to read only works for some things (when you know what that count is). Thus for consistency (and less confusion), the former works better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's two advantages to having a count in the protocol. The first is that the client knows what to expect. So if you are trying to create a slice of all of the points, I know before reading any points how many are going to be in the current chunk and I also know if I'm expecting more. I can allocate an array that's appropriate.
The second advantage is there is less guessing in the protocol. If you have some kind of signal at the end that tells the stream if it has finished reading, then you have to determine what that signal is and try to find it. It adds an additional condition to the code and requires that you read at least one byte that you then have to buffer just in case that byte is not the EOF signal. If you use an actual EOF signal, that isn't very kind to reusing a socket connection. This is made more difficult because we would need multiple EOF signals. One for the end of a series, another for the end of a result, and another for an end to the response itself.
Since we know what we are going to send before we send it, the first seems less complicated from the client side and has fewer conditions.
services/httpd/msgp_emitter.go
Outdated
|
||
enc.Flush() | ||
if w, ok := w.(http.Flusher); ok { | ||
w.Flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the calls to w.Flush()
are very inefficient in terms of network IO, especially for high latency / low throughput links. It's much more efficient to have large packets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll have to figure out a good buffering size for this. For this to be a streaming protocol, it needs to not buffer the entire thing. I do think that we can probably remove some of these flushes though for speed. The first flush that sends the headers is likely necessary though and the flush after sending the result header is another one that seems useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to flush after a certain type of data is sent, instead of flushing when buffer is full?
Or alternatively, what is the purpose of flushing headers when there is no data? The client can read the headers yes, but if it has no data, the headers don't serve much purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible to know the response header. I don't know about other HTTP libraries, but the Go one will block until it receives the headers. Once it receives the headers, you can read the body whenever you want.
So it's mostly to make Ctrl+C functionality possible. If you don't flush the headers immediately, the client won't be able to get to the section of code where handling an interrupt is possible.
A (non-compiling) example:
resp, err := http.DefaultClient.Do(req)
if err != nil {
panic(err)
}
// Does not get here until the headers are sent back.
signals := make(os.Signal, 0, 4)
signal.Notify(signals, os.Interrupt)
go func() {
<-signals
resp.Body.Close()
}()
for {
// read points from the stream
}
Something like that, but the code above doesn't work and has issues.
services/httpd/msgp_emitter.go
Outdated
values = values[:0] | ||
|
||
err := RowError{Error: row.Err.Error()} | ||
err.EncodeMsg(enc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing to indicate to the client whether it should be reading a Row
or a RowError
.
services/httpd/msgp_emitter.go
Outdated
|
||
err := RowError{Error: row.Err.Error()} | ||
err.EncodeMsg(enc) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless I'm misinterpreting the intended protocol, the false
on line 95 is supposed to mean no-more-rows. But the continue
here will result in more rows being sent as they're read off the channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention is to drain the result set just in case there's a bug. That should be made explicit.
services/httpd/msgp_emitter.go
Outdated
|
||
if len(values) == cap(values) { | ||
enc.WriteArrayHeader(2) | ||
enc.WriteInt(len(values)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason for breaking rows into chunks? Why not just dump them all? If the client wants them in batches of size X, then it can read X rows, go do stuff, then come back and read another X.
For JSON, the old chunking behavior made sense as in JSON the response was considered a single object, and unmarshalled all at once. But in this response the rows aren't being returned in an array or any other object. They are being returned as a stream of independent objects.
My bias against this is that it makes things harder for clients. We now have N fields inside N rows inside N row chunks inside N series inside N result sets. The nesting is rather deep.
Another option would be to marshal as an array. If the intent of chunking is for the client to unmarshal the whole chunk at the same time, using an array would allow the client to use native MessagePack library functions to unmarshal the array instead of manual iteration.
But I still think true streaming is a better idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you can give me an idea what you mean by true streaming, I'm all for that. The old response structure had a problem that it wasn't true streaming and the results were embedded in a weird way that made reading from it as a client impossible.
The reason why this encodes each row is so that it can read one row at a time. If you encode as an array, you have to decode the entire array rather than just the next row. Then you have to buffer the full results from the chunk. While this method requires maintaining state for how much you are going to read, that's just an integer and a boolean rather than an array. And you would have to still maintain state for the boolean (do I need to read more when I finish reading the array?) and for the array itself. So both maintain roughly the same state.
Now reading an array of rows as a chunk may perform better. If it does, we should consider the pros and cons. I haven't performed any testing of that.
services/httpd/emitter.go
Outdated
|
||
func NewEncoder(r *http.Request, config *Config) Encoder { | ||
epoch := strings.TrimSpace(r.FormValue("epoch")) | ||
switch r.Header.Get("Accept") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parsing of the Accept
header needs to be more complex. It's possible for the client to accept multiple types. This will be a very likely occurrence once msgpack is introduced as clients will prefer msgpack, but also accept json.
For example:
Accept: application/x-msgpack, application/json;q=0.7
It's also technically valid to do:
Accept: application/x-msgpack
Accept: application/json
Though this is rather uncommon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you make a PR for this, I would definitely review it and accept it.
23b5d51
to
b52de03
Compare
I have added a formal writeup of the cursor protocol. Please take a look at it and I will then update the code to match the changes I have made. |
services/httpd/cursor.md
Outdated
Error string | ||
} | ||
|
||
The `RowChunkHeader` contains metadata for the rows that are about to be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume you mean RowBatchHeader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I changed the name several times. I hate all of the names. Feel free to give feedback on them. I'm about ready to name them using /dev/urandom
at this point.
{"name":"databases","columns":["name"]} | ||
{"length":2} | ||
{"values":["db0"]} | ||
{"values":["db1"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't there be a trailing false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I messed up here.
|
||
// Continue is set to true if there is another batch that | ||
// should be read. | ||
Continue bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One awkward thing about doing it this way is that it means the server has to delay the batch until it knows whether another batch will follow. While this is likely to be an infinitesimally small delay, it just feels weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually already needs to do this to be backwards compatible with the old format. It is a bit weird, but it's already there and makes it easier in the long run. If we change it in the future so the code doesn't do that, we can always mark one batch as continue: true
and then have zero length for the next batch when we learn there aren't any more entries.
d4d0cdc
to
a73db17
Compare
|
||
## MIME Type | ||
|
||
The MIME type for the InfluxDB Cursor Protocol is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another possibility for this which might be better is application/vnd.influxdb-cursor+msgpack; version=1.0
. So the version is a parameter instead of part of the accept header.
a73db17
to
79dcbd0
Compare
Just curious, is there any idea when this change might make it in? If we think this is close to the final iteration, I can do another build on top of a more recent master, update the client to match, and provide more feedback. |
I'm going to rebase this and modify the code to match the README. I don't think they currently match anymore. That will allow you to run whatever tests. If this performs better, then that may throw some momentum towards getting this merged. @jwilder can you take a look at the cursor.md file contained in this PR so we can start the process of determining if this proposal is suitable? |
This refactors the internal result returning system to match more closely how we iterative return points between result sets, series, and rows within the series. It uses the updated terminology rather than older terminology that no longer matches how we refer to things in the documentation or within the query engine. The refactor moves the aggregation and chunking behavior from `influxql.Emitter` to the HTTP service so that behavior is isolated to one location rather than sprinkled around in multiple locations.
79dcbd0
to
42c5de1
Compare
I have updated this PR so now the code aligns with the specification in |
We're going to be revisiting this idea in the future. We're working on a successor to influxql and are not going to be making this change to influxql at the current moment. Sorry for the confusion. |
For posterity, msgpack was added over in #8897 |
This refactors the internal result returning system to match more
closely how we iterative return points between result sets, series, and
rows within the series. It uses the updated terminology rather than
older terminology that no longer matches how we refer to things in the
documentation or within the query engine.
The refactor moves the aggregation and chunking behavior from
influxql.Emitter
to the HTTP service so that behavior is isolated toone location rather than sprinkled around in multiple locations. This
refactor also allows us to use different output structures. One of these
new output structures is implemented when using
application/x-msgpack
as the output format.
This new format is more suited to client designs than the old JSON
format for a few reasons. First, it maintains floats and integers as
separate types. So, unlike JSON, you do not have to guess if a returned
point is a float or an integer when it is a whole number. That is
returned as part of the format.
Additionally, it removes the current difficult to use chunking and
creates a new iterative format that shouldn't be as difficult to
process. For this new format, there is no non-chunking format.
Fixes #8450.