Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a new TSO client implementation #92

Merged
merged 9 commits into from Aug 6, 2019
@@ -45,4 +45,4 @@ script:
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker ps; fi
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs pd; fi
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs kv; fi
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then PD_ADDR="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then PD_ADDRS="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi
This conversation was marked as resolved by nrc

This comment has been minimized.

Copy link
@nrc

nrc Jul 31, 2019

Contributor

Why do we need to take multiple addresses for PD?

This comment has been minimized.

Copy link
@sticnarf

sticnarf Aug 1, 2019

Author Contributor

PD is a cluster. We can configure multiple addresses. In the future, we may do integration tests when multiple PD addresses are given (the case in practice) .

This comment has been minimized.

Copy link
@nrc

nrc Aug 1, 2019

Contributor

👍

@@ -20,15 +20,14 @@ name = "tikv_client"
[dependencies]
derive-new = "0.5"
failure = "0.1"
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
futures-preview = { version = "0.3.0-alpha.17", features = ["compat", "async-await", "nightly"] }
grpcio = { version = "0.5.0-alpha", features = [ "secure", "prost-codec" ], default-features = false }
kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "ec9df5f208a768074c28d04bfb8b90e5321d2f69", features = [ "prost-codec" ], default-features = false }
lazy_static = "1"
log = "0.3.9"
regex = "1"
serde = "1.0"
serde_derive = "1.0"
tokio-core = "0.1"
tokio-timer = "0.2"

[dependencies.prometheus]
@@ -94,124 +94,3 @@ pub(crate) trait ClientFutureExt {
}

impl<T: TryFuture> ClientFutureExt for T {}

/// Emulate `send_all`/`SendAll` from futures 0.1 since the 0.3 versions don't

This comment has been minimized.

Copy link
@nrc

nrc Jul 31, 2019

Contributor

\o/

/// work with Tokio `Handle`s due to ownership differences.
pub(crate) trait SinkCompat<I, E> {
fn send_all_compat<S>(self, stream: S) -> SendAllCompat<Self, S>
where
S: Stream<Item = I> + Unpin,
Self: Sink<I, Error = E> + Sized + Unpin,
{
SendAllCompat::new(self, stream)
}
}

impl<T, E, S: Sink<T, Error = E>> SinkCompat<T, E> for S {}

#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub(crate) struct SendAllCompat<Si, St>
where
Si: Sink<St::Item> + Unpin,
St: Stream + Unpin,
{
sink: Option<Si>,
stream: Option<stream::Fuse<St>>,
buffered: Option<St::Item>,
}

impl<Si, St> Unpin for SendAllCompat<Si, St>
where
Si: Sink<St::Item> + Unpin,
St: Stream + Unpin,
{
}

impl<Si, St> SendAllCompat<Si, St>
where
Si: Sink<St::Item> + Unpin,
St: Stream + Unpin,
{
pub(crate) fn new(sink: Si, stream: St) -> SendAllCompat<Si, St> {
SendAllCompat {
sink: Some(sink),
stream: Some(stream.fuse()),
buffered: None,
}
}

fn sink_mut(&mut self) -> Pin<&mut Si> {
Pin::new(
self.sink
.as_mut()
.take()
.expect("Attempted to poll SendAllCompat after completion"),
)
}

fn stream_mut(&mut self) -> Pin<&mut stream::Fuse<St>> {
Pin::new(
self.stream
.as_mut()
.take()
.expect("Attempted to poll SendAllCompat after completion"),
)
}

fn take_result(&mut self) -> (Si, St) {
let sink = self
.sink
.take()
.expect("Attempted to poll SendAllCompat after completion");
let fuse = self
.stream
.take()
.expect("Attempted to poll SendAllCompat after completion");
(sink, fuse.into_inner())
}

fn try_start_send(
&mut self,
item: St::Item,
cx: &mut Context,
) -> Poll<Result<(()), Si::Error>> {
debug_assert!(self.buffered.is_none());
match self.sink_mut().poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(self.sink_mut().start_send(item)),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => {
self.buffered = Some(item);
Poll::Pending
}
}
}
}

impl<Si, St> Future for SendAllCompat<Si, St>
where
Si: Sink<St::Item> + Unpin,
St: Stream + Unpin,
{
type Output = Result<((Si, St)), Si::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<((Si, St)), Si::Error>> {
if let Some(item) = self.buffered.take() {
ready!(self.try_start_send(item, cx))?
}

loop {
match self.stream_mut().poll_next(cx) {
Poll::Ready(Some(item)) => ready!(self.try_start_send(item, cx))?,
Poll::Ready(None) => {
ready!(self.sink_mut().poll_close(cx))?;
return Poll::Ready(Ok(self.take_result()));
}
Poll::Pending => {
ready!(self.sink_mut().poll_flush(cx))?;
return Poll::Pending;
}
}
}
}
}
@@ -198,8 +198,10 @@ impl Error {
})
}

pub(crate) fn internal_error(message: String) -> Self {
Error::from(ErrorKind::InternalError { message })
pub(crate) fn internal_error(message: impl Into<String>) -> Self {
Error::from(ErrorKind::InternalError {
message: message.into(),
})
}
}

@@ -9,7 +9,7 @@ use std::env::var;

mod raw;

pub(crate) const ENV_PD_ADDR: &str = "PD_ADDR";
pub(crate) const ENV_PD_ADDRS: &str = "PD_ADDRS";
pub(crate) const PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB
pub(crate) const PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB
pub(crate) const PROPTEST_BATCH_SIZE_MAX: usize = 16;
@@ -22,9 +22,9 @@ pub fn arb_batch<T: core::fmt::Debug>(
proptest::collection::vec(single_strategy, 0..max_batch_size)
}

pub fn pd_addr() -> Vec<String> {
var(ENV_PD_ADDR)
.expect(&format!("Expected {}:", ENV_PD_ADDR))
pub fn pd_addrs() -> Vec<String> {
var(ENV_PD_ADDRS)
.expect(&format!("Expected {}:", ENV_PD_ADDRS))
.split(",")
.map(From::from)
.collect()
@@ -1,6 +1,6 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use super::{arb_batch, pd_addr};
use super::{arb_batch, pd_addrs};
use crate::{raw::Client, Config, KvPair, Value};
use futures::executor::block_on;
use proptest::{arbitrary::any, proptest};
@@ -12,7 +12,7 @@ proptest! {
fn point(
pair in any::<KvPair>(),
) {
let client = block_on(Client::connect(Config::new(pd_addr()))).unwrap();
let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap();

block_on(
client.put(pair.key().clone(), pair.value().clone())
@@ -36,7 +36,7 @@ proptest! {
fn batch(
kvs in arb_batch(any::<KvPair>(), None),
) {
let client = block_on(Client::connect(Config::new(pd_addr()))).unwrap();
let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap();
let keys = kvs.iter().map(|kv| kv.key()).cloned().collect::<Vec<_>>();

block_on(
@@ -226,7 +226,7 @@ impl<PdC: PdClient> RpcClient<PdC> {
}

pub fn get_timestamp(self: Arc<Self>) -> impl Future<Output = Result<Timestamp>> {
Arc::clone(&self.pd).get_timestamp()
self.pd.clone().get_timestamp()
}

// Returns a Steam which iterates over the contexts for each region covered by range.
@@ -16,7 +16,7 @@ use kvproto::{metapb, pdpb};
use crate::{
rpc::{
pd::{
context::request_context, request::retry_request, timestamp::PdReactor, Region,
context::request_context, request::retry_request, timestamp::TimestampOracle, Region,
RegionId, StoreId, Timestamp,
},
security::SecurityManager,
@@ -108,6 +108,7 @@ impl PdClient for RetryClient {
}

fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
// FIXME: retry or reconnect on error
Box::pin(self.cluster.read().unwrap().get_timestamp())
}
}
@@ -142,7 +143,7 @@ pub struct Cluster {
pub id: u64,
pub(super) client: pdpb::PdClient,
members: pdpb::GetMembersResponse,
reactor: Arc<RwLock<PdReactor>>,
tso: TimestampOracle,
}

// These methods make a single attempt to make a request.
@@ -256,7 +257,7 @@ impl Cluster {
}

fn get_timestamp(&self) -> impl Future<Output = Result<Timestamp>> {
self.reactor.write().unwrap().get_timestamp()
self.tso.clone().get_timestamp()
}
}

@@ -281,14 +282,13 @@ impl Connection {
let (client, members) = self.try_connect_leader(&members, timeout)?;

let id = members.get_header().get_cluster_id();
let tso = TimestampOracle::new(id, &client)?;
let cluster = Cluster {
id,
members,
client,
reactor: Arc::new(RwLock::new(PdReactor::new())),
tso,
};

PdReactor::start(cluster.reactor.clone(), &cluster);
Ok(cluster)
}

@@ -307,14 +307,14 @@ impl Connection {
warn!("updating pd client, blocking the tokio core");
let start = Instant::now();
let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?;
let tso = TimestampOracle::new(old_cluster.id, &client)?;

This comment has been minimized.

Copy link
@nrc

nrc Aug 2, 2019

Contributor

Do we need to create a whole new oracle (including the worker, etc.?). Could we keep the same worker and just clone the part which lives on the main thread?

This comment has been minimized.

Copy link
@AndreMouche

AndreMouche Aug 6, 2019

Member

We will start the background thread to handle TSO requests in the creation, so we need to create a whole new oracle. Maybe we can do the refactor in the future? @nrc


let cluster = Cluster {
id: old_cluster.id,
client,
members,
reactor: old_cluster.reactor.clone(),
tso,
};
PdReactor::start(cluster.reactor.clone(), &cluster);
*self.last_update.write().unwrap() = Instant::now();

warn!("updating PD client done, spent {:?}", start.elapsed());
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.