Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a new TSO client implementation #92

Merged
merged 9 commits into from Aug 6, 2019

Conversation

@sticnarf
Copy link
Contributor

sticnarf commented Jul 31, 2019

As #89 mentioned, there is a bug in the current TSO client implementation. I feel it difficult to find out where the bug is and the old implementation uses the legacy tokio-core. Therefore, I decide to implement a new TSO client with futures 0.3 and async/await. I hope the implentation is now more efficient (more reliable batch size) and clearer.

Fixes #89

sticnarf added 2 commits Jul 30, 2019
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
@sticnarf sticnarf force-pushed the sticnarf:fix-tso branch from 9c73ee5 to e652a6a Jul 31, 2019
@sticnarf sticnarf requested a review from nrc Jul 31, 2019
sticnarf added 2 commits Jul 31, 2019
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
Copy link
Contributor

nrc left a comment

Looks like a good move. I have some initial superficial comments, I will do a deep review, but need more coffee first :-)

.travis.yml Show resolved Hide resolved
@@ -94,124 +94,3 @@ pub(crate) trait ClientFutureExt {
}

impl<T: TryFuture> ClientFutureExt for T {}

/// Emulate `send_all`/`SendAll` from futures 0.1 since the 0.3 versions don't

This comment has been minimized.

Copy link
@nrc

nrc Jul 31, 2019

Contributor

\o/


/// The timestamp oracle which provides monotonically increasing timestamps.
#[derive(Clone)]
pub struct Tso {

This comment has been minimized.

Copy link
@nrc

nrc Jul 31, 2019

Contributor

Could you use a more self-explanatory name than Tso please? TimestampOracle or TimestampServer or something better if you have an idea.

This comment has been minimized.

Copy link
@sticnarf

sticnarf Aug 1, 2019

Author Contributor

Sure

/// The timestamp oracle which provides monotonically increasing timestamps.
#[derive(Clone)]
pub struct Tso {
/// The transmitter of a bounded channel which transports the sender of an oneshot channel to

This comment has been minimized.

Copy link
@nrc

nrc Jul 31, 2019

Contributor
Suggested change
/// The transmitter of a bounded channel which transports the sender of an oneshot channel to
/// The transmitter of a bounded channel which transports the sender of a oneshot channel to

Nit. English is a terrible language, sorry :-(

.unbounded_send(task)
.expect("unbounded send should never fail");
}
struct TsoContext {

This comment has been minimized.

Copy link
@nrc

nrc Jul 31, 2019

Contributor

Can you find a better name here too? Context is not very descriptive.

This comment has been minimized.

Copy link
@sticnarf

sticnarf Aug 1, 2019

Author Contributor

Sorry but I cannot think of an appropriate name. Could you help me?
The struct is just what I want to share in both the send_requests future and the receive_and_handle_response future. The send_requests future stores its waker in it and pushes timestamp senders into the queue. The receive_and_handle_response future dequeues the queue and wakes up the former future.

This comment has been minimized.

Copy link
@nrc

nrc Aug 2, 2019

Contributor

So, it seems like this is a Task or a WorkItem? Or perhaps it is a TimestampRequestQueue? I think it is a good idea to think about what it is and the name should follow. If we don't have a clear idea of what it is and does, then that suggests that it might not be the right abstraction.

This comment has been minimized.

Copy link
@sticnarf

sticnarf Aug 2, 2019

Author Contributor

Right. I have the same feeling too. It is not something that can be clearly described. It is created according to what we need.
They are something that both futures need to access and mutate. If we don't have such a struct, we have to make two Rc<RefCell>s (one for setting the waker, one for mutating the queue).
However, I will try and see if I can do an ideal refactoring.

@@ -0,0 +1,41 @@
#![cfg(feature = "integration-tests")]
#![feature(async_await)]

This comment has been minimized.

Copy link
@nrc

nrc Jul 31, 2019

Contributor

Could you add unit tests to test the Tso? It really seems like something where we should be able to mock out the pd rpc stuff and test the logic in the Tso itself. (Fine to have an integration test as well, but they don't run without a cluster and tend to test the cluster more than the client itself).

This comment has been minimized.

Copy link
@sticnarf

sticnarf Aug 1, 2019

Author Contributor

Of course, we need a unit test for TSO. An integration test is simper so I make it first.

This comment has been minimized.

Copy link
@sticnarf

sticnarf Aug 2, 2019

Author Contributor

I don't think we must add unit tests for it so early. I've created #93 to track it.

sticnarf added 2 commits Aug 1, 2019
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
Copy link
Contributor

nrc left a comment

I left some more comments inline. I admit I am struggling to get my head around the details here. It seems to me that there are more moving parts than necessary, but I can't quite identify how to improve. Perhaps thinking about exactly what roles the various structs play will illuminate improvements to the structure? I'll finish my review on Monday.

.unbounded_send(task)
.expect("unbounded send should never fail");
}
struct TsoContext {

This comment has been minimized.

Copy link
@nrc

nrc Aug 2, 2019

Contributor

So, it seems like this is a Task or a WorkItem? Or perhaps it is a TimestampRequestQueue? I think it is a good idea to think about what it is and the name should follow. If we don't have a clear idea of what it is and does, then that suggests that it might not be the right abstraction.

@@ -3,226 +3,185 @@
//! This module is the low-level mechanisms for getting timestamps from a PD
//! cluster. It should be used via the `get_timestamp` API in `PdClient`.

This comment has been minimized.

Copy link
@nrc

nrc Aug 2, 2019

Contributor

Could you document at a high level how it works here please?

This comment has been minimized.

Copy link
@sticnarf

sticnarf Aug 2, 2019

Author Contributor

I can document how it is implemented. But I am not clear what you mean "high level"?

This comment has been minimized.

Copy link
@nrc

nrc Aug 5, 2019

Contributor

I mean things like what happens on each thread, the responsibility of each piece, and how the pieces communicate with each other (as opposed to what the fields and methods are).

cluster_id: u64,
result_sender_rx: mpsc::Receiver<oneshot::Sender<Timestamp>>,
rpc_sender: Compat01As03Sink<ClientDuplexSender<TsoRequest>, (TsoRequest, WriteFlags)>,
rpc_receiver: Compat01As03<ClientDuplexReceiver<TsoResponse>>,

This comment has been minimized.

Copy link
@nrc

nrc Aug 2, 2019

Contributor

Why do we have these things instead of a TsoRequestStream?

@@ -307,14 +307,14 @@ impl Connection {
warn!("updating pd client, blocking the tokio core");
let start = Instant::now();
let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?;
let tso = TimestampOracle::new(old_cluster.id, &client)?;

This comment has been minimized.

Copy link
@nrc

nrc Aug 2, 2019

Contributor

Do we need to create a whole new oracle (including the worker, etc.?). Could we keep the same worker and just clone the part which lives on the main thread?

This comment has been minimized.

Copy link
@AndreMouche

AndreMouche Aug 6, 2019

Member

We will start the background thread to handle TSO requests in the creation, so we need to create a whole new oracle. Maybe we can do the refactor in the future? @nrc

/// The transmitter of a bounded channel which transports the sender of a oneshot channel to
/// the TSO working thread.
/// In the working thread, the `oneshot::Sender` is used to send back timestamp results.
result_sender_tx: mpsc::Sender<oneshot::Sender<Timestamp>>,

This comment has been minimized.

Copy link
@nrc

nrc Aug 2, 2019

Contributor

I think it would be good to either wrap or alias oneshot::Sender<Timestamp> as a TimestampRequest or something, since just sending a channel is a bit weird-looking. Could you rename result_sender_tx too? I think it is a confusing name, since it is the channel for sending requests, not for sending results.

This comment has been minimized.

Copy link
@sticnarf

sticnarf Aug 2, 2019

Author Contributor

Good idea. I think I can improve this.

@sticnarf

This comment has been minimized.

Copy link
Contributor Author

sticnarf commented Aug 2, 2019

@nrc I would answer other comments here. Inside PdClient, there is no async environment now. So the timestamp oracle needs to create its own thread and futures executor. TSO is lightweight, so I think we don't need a thread pool and all operations in TSO don't need to cross thread boundaries (that's also why we can use Rc and RefCell here).
When PD needs reconnecting, the stream and sink inside TSO are invalid too. It's easy to create a new TSO using the newly connected PD. The reconnecting mechanisms can be improved a lot in the future. I want to keep it simple for now.

sticnarf added 2 commits Aug 5, 2019
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
Copy link
Member

AndreMouche left a comment

LGTM, I think we can make it work as the first step, then make it better.

@@ -307,14 +307,14 @@ impl Connection {
warn!("updating pd client, blocking the tokio core");
let start = Instant::now();
let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?;
let tso = TimestampOracle::new(old_cluster.id, &client)?;

This comment has been minimized.

Copy link
@AndreMouche

AndreMouche Aug 6, 2019

Member

We will start the background thread to handle TSO requests in the creation, so we need to create a whole new oracle. Maybe we can do the refactor in the future? @nrc

@sticnarf sticnarf requested a review from MyonKeminta Aug 6, 2019
pending_requests.push_back(sender);
count += 1;
}
Poll::Ready(None) => return Poll::Ready(None),

This comment has been minimized.

Copy link
@MyonKeminta

MyonKeminta Aug 6, 2019

If count > 0 now, may we drop the received requests?

This comment has been minimized.

Copy link
@sticnarf

sticnarf Aug 6, 2019

Author Contributor

Yes and it is expected. As we don't close the channel manually, the only possibility we go into this branch is that all senders are dropped. And now, this only happens when a reconnection occurs and the old TimestampOracle in
the RetryClient is replaced with a new one. Then, the old channel to the PD server is no longer usable. We can simply drop them here and finally the receiver will get an error.

.ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?;

let mut offset = i64::from(resp.count);
while offset > 0 {

This comment has been minimized.

Copy link
@MyonKeminta

MyonKeminta Aug 6, 2019

Is it possible that PD returns less timestamps than requested? What will happen then?

This comment has been minimized.

Copy link
@sticnarf

sticnarf Aug 6, 2019

Author Contributor

I am not sure about it. If it happens, there will be more and more pending requests and the timestamp oracle will finally block forever. A robust client should handle it but I don't want to check it now. I can add an issue to track that.

This comment has been minimized.

Copy link
@sticnarf

sticnarf Aug 6, 2019

Author Contributor

Tracked in #95

Copy link

MyonKeminta left a comment

LGTM

@sticnarf sticnarf merged commit 2733e28 into tikv:master Aug 6, 2019
2 checks passed
2 checks passed
DCO All commits are signed off!
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.