-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Basic token awareness #59
Conversation
fa29fe1
to
38e6120
Compare
@kbr- There's some bug with compression (#42), but you can disable compression from
|
scylla/src/transport/session.rs
Outdated
let conn = match self.pool.write().unwrap().entry(owner) { | ||
Entry::Vacant(entry) => { | ||
let conn = Arc::new(new_conn); | ||
entry.insert(SharedConnection { conn: conn.clone() }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use to_owned()
instead of clone()
for the odd chance that the clone can be optimized out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean
entry.insert(Arc::new(new_conn)).to_owned()
?
I can do that (it even looks more elegant), but I don't see how a clone can be optimized out; I want two shared pointers to the connection after all, one in the pool and one returned to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use to_owned
now.
scylla/src/transport/topology.rs
Outdated
// while continuing waiting on the previous connection | ||
let (peer_addrs, _) = | ||
match timeout(Duration::from_secs(5), query_peers(conn, false, self.port)).await { | ||
Err(_) => continue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: maybe we should drop/close the connection in case an error occurs while querying? Apart from that, we could reorganize the code a little bit so that the "establish new connections" loop would always run after this one.
If we do this, this code should become resilient to connection drops without changing it too much - although it would sometimes drop the connection unnecesarily if some nodes return an an UnavailableException or something. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should drop/close the connection in case an error occurs while querying?
that would require modifying our errors to use different types and drop/close connections only when appropriate (based on the type). We certainly should not drop a connection e.g. when a query timeouts, since that might be just a temporary network partition between the driver and that particular server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and of course it should be done eventually (and in Session too), but it's out of scope of this PR IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, the problem with current approach is that - if I understood the implementation correctly - is that if connection to a node is broken (e.g. socket is closed), we won't reopen a connection to this node ever.
I suggest to close the connection from the client side every time we get an error. It will cause us to sometimes close a connection unnecessarily e.g. the query timeouts, but we will gain ability to reconnect in case e.g. a node restarts.
I agree that we will be able to solve this problem properly if we modify our error types, but I'd argue that reopening connections would be more robust and easy to implement right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I'm dropping connections that return an error. But I don't drop them on timeouts.
Also I always try to establish new connections in the refresh function.
I addressed the comments. However, I've encountered the following error:
need to investigate. In the meantime you can re-review. |
That says a lot... https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/exceptions/ExceptionCode.java#L51
|
Well that's just your standard And rightly so, because in my manual testing I created a table In any case, it looks like there's another bug to hunt though:
(and there's an obvious reason to why this happens) But I need to get some sleep first :) |
@kbr- this error about prepared statements is most likely a result of the test cases running in parallel and operating on identical table names. One obvious workaround is to use unique names across test cases :) (we should have a utility for that), and you can also always run tests in a single thread. I don't remember the param now, something like --test-threads 1 |
Oh wait, you didn't add a test case. In that case, the reason is that we currently prepare a statement only on a single connection, while we should do that on each node, since the cache is node-local. It's weird that the driver haven't simply reprepared the statement though... |
scylla/src/frame/response/result.rs
Outdated
@@ -282,6 +294,24 @@ fn deser_cql_value(typ: &ColumnType, buf: &mut &[u8]) -> AResult<CQLValue> { | |||
CQLValue::BigInt(buf.read_i64::<BigEndian>()?) | |||
} | |||
Text => CQLValue::Text(str::from_utf8(buf)?.to_owned()), | |||
Inet => CQLValue::Inet(match buf.len() { | |||
4 => { | |||
let ret = IpAddr::from(<[u8; 4]>::try_from(&buf[0..4])?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will panic if the frame is too short. Please use types::read_raw_bytes
, which checks if the slice is long enough first (you will need to reexport it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you change it, you should drop the buf.advance(4)
, because that read_raw_bytes
will do it for you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we're in a match buf.len()
case 4
, so how could the slice be too short?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Argh, you are right. I got confused and thought this was frame parsing code, and the length was read from the frame. I'll send a fix which changes it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, the additional check doesn't hurt I think.
scylla/src/frame/response/result.rs
Outdated
ret | ||
} | ||
16 => { | ||
let ret = IpAddr::from(<[u8; 16]>::try_from(&buf[0..16])?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
@kbr- I'll allow myself to address the final review comments, so that we can merge it and unblock the possibility to send more PRs that would potentially conflict with this. |
@piodul thanks! |
This version picks the owner of the token range that the queried primary key's token lies in. The owner is always one of the replicas (irrespective of the replication strategy).
In the end we should calculate the entire set of replicas using the replication strategy of the queried table and if sending the query to one of them fails, try another. But taking just the owner is a start.
Fixes #14
Fixes #15
It is untested yet whether this actually routes statements to the right replicas. But I checked manually that it does indeed choose between different replicas... just dunno if they are the right ones :)
(I wanted to test it using cqlsh-rs, but it crashes:
CQL frame LZ4 uncompression failure
:()