Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve locks #108

Merged
merged 10 commits into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ derive-new = "0.5"
failure = "0.1"
futures-preview = { version = "0.3.0-alpha.18", features = ["compat", "async-await", "nightly"] }
grpcio = { version = "0.5.0-alpha", features = [ "secure", "prost-codec" ], default-features = false }
kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "81e0c4635c2e28462fa0ad82c39f126448976de7", features = [ "prost-codec" ], default-features = false }
kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "2fc6229ed6097b59dbe51525c7a65b19543a94ca", features = [ "prost-codec" ], default-features = false }
lazy_static = "1"
log = "0.3.9"
regex = "1"
serde = "1.0"
serde_derive = "1.0"
tokio-timer = "0.2"
futures-timer = "0.3"

[dependencies.prometheus]
version = "0.4.2"
Expand All @@ -38,7 +39,8 @@ features = ["push", "process"]
[dev-dependencies]
clap = "2.32"
tempdir = "0.3"
runtime = { version = "0.3.0-alpha.6", default-features = false }
runtime-tokio = "0.3.0-alpha.5"
runtime = { version = "0.3.0-alpha.7", default-features = false }
runtime-tokio = "0.3.0-alpha.6"
proptest = "0.9"
proptest-derive = "0.1.0"
fail = { version = "0.3", features = [ "failpoints" ] }
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ This is an open source (Apache 2) project hosted by the Cloud Native Computing F

The TiKV client is a Rust library (crate). It uses async/await internally and exposes some `async fn` APIs as well.

Async/await is a new feature in Rust and is currently unstable. To use it you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below).
Async/await is a new feature in Rust and cannot be used with a stable Rust compiler. To use it now you'll need to use Rust 1.39 (see below).

To use this crate in your project, add it as a dependency in your `Cargo.toml`:

Expand Down Expand Up @@ -52,7 +52,7 @@ To check what version of Rust you are using, run
rustc --version
```

You'll see something like `rustc 1.38.0-nightly (dddb7fca0 2019-07-30)` where the `1.38.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use
You'll see something like `rustc 1.39.0-nightly (eeba189cf 2019-08-24)` where the `1.39.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use

```bash
rustup toolchain install nightly
Expand Down
1 change: 0 additions & 1 deletion examples/raw.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

#![feature(async_await, await_macro)]
#![type_length_limit = "8165158"]

mod common;
Expand Down
2 changes: 0 additions & 2 deletions examples/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

#![feature(async_await, await_macro)]

mod common;

use crate::common::parse_args;
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly-2019-07-31
nightly-2019-08-25
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only do one thing in one PR. Please pay attention to this rule in the future.

Copy link
Collaborator Author

@sticnarf sticnarf Aug 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, just find some dependencies cannot compile because they remove the async/await feature gate.

114 changes: 15 additions & 99 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ pub enum ErrorKind {
/// Feature is not implemented.
#[fail(display = "Unimplemented feature")]
Unimplemented,
// No region is found for the given key.
/// No region is found for the given key.
#[fail(display = "Region is not found for key: {:?}", key)]
RegionForKeyNotFound { key: Vec<u8> },
/// The peer is not the leader for the region.
#[fail(display = "Peer is not leader for region {}. {}", region_id, message)]
NotLeader { region_id: u64, message: String },
/// Stale epoch
#[fail(display = "Stale epoch. {}", message)]
StaleEpoch { message: String },
/// Errors caused by changed region information
#[fail(display = "Region error: {:?}", _0)]
RegionError(kvproto::errorpb::Error),
/// No region is found for the given id.
#[fail(display = "Region {} is not found. {}", region_id, message)]
RegionNotFound { region_id: u64, message: String },
#[fail(display = "Region {} is not found", region_id)]
RegionNotFound { region_id: u64 },
/// No region is found for the given id.
#[fail(display = "Leader of region {} is not found", region_id)]
LeaderNotFound { region_id: u64 },
/// Invalid key range to scan. Only left bounded intervals are supported.
#[fail(display = "Only left bounded intervals are supported")]
InvalidKeyRange,
Expand All @@ -53,43 +53,6 @@ pub enum ErrorKind {
/// A string error returned by TiKV server
#[fail(display = "Kv error. {}", message)]
KvError { message: String },
/// Reconstructed `kvproto::errorpb::KeyNotInRegion`
#[fail(
display = "Key {:?} is not in region {}: [{:?}, {:?})",
key, region_id, start_key, end_key
)]
KeyNotInRegion {
key: Vec<u8>,
region_id: u64,
start_key: Vec<u8>,
end_key: Vec<u8>,
},
/// Reconstructed `kvproto::errorpb::ServerIsBusy`
#[fail(display = "Server is busy: {}. Backoff {} ms", reason, backoff_ms)]
ServerIsBusy { reason: String, backoff_ms: u64 },
/// Represents `kvproto::errorpb::StaleCommand` with additional error message
#[fail(display = "Stale command. {}", message)]
StaleCommand { message: String },
/// Represents `kvproto::errorpb::StoreNotMatch` with additional error message
#[fail(
display = "Requesting store {} when actual store is {}. {}",
request_store_id, actual_store_id, message
)]
StoreNotMatch {
request_store_id: u64,
actual_store_id: u64,
message: String,
},
/// Represents `kvproto::errorpb::RaftEntryTooLarge` with additional error message
#[fail(
display = "{} bytes raft entry of region {} is too large. {}",
entry_size, region_id, message
)]
RaftEntryTooLarge {
region_id: u64,
entry_size: u64,
message: String,
},
#[fail(display = "{}", message)]
InternalError { message: String },
}
Expand Down Expand Up @@ -123,24 +86,16 @@ impl Error {
Error::from(ErrorKind::RegionForKeyNotFound { key })
}

pub(crate) fn not_leader(region_id: u64, message: Option<String>) -> Self {
Error::from(ErrorKind::NotLeader {
region_id,
message: message.unwrap_or_default(),
})
pub(crate) fn region_error(error: kvproto::errorpb::Error) -> Self {
Error::from(ErrorKind::RegionError(error))
}

pub(crate) fn stale_epoch(message: Option<String>) -> Self {
Error::from(ErrorKind::StaleEpoch {
message: message.unwrap_or_default(),
})
pub(crate) fn region_not_found(region_id: u64) -> Self {
Error::from(ErrorKind::RegionNotFound { region_id })
}

pub(crate) fn region_not_found(region_id: u64, message: Option<String>) -> Self {
Error::from(ErrorKind::RegionNotFound {
region_id,
message: message.unwrap_or_default(),
})
pub(crate) fn leader_not_found(region_id: u64) -> Self {
Error::from(ErrorKind::LeaderNotFound { region_id })
}

pub(crate) fn invalid_key_range() -> Self {
Expand All @@ -155,45 +110,6 @@ impl Error {
Error::from(ErrorKind::KvError { message })
}

pub(crate) fn key_not_in_region(mut e: kvproto::errorpb::KeyNotInRegion) -> Self {
Error::from(ErrorKind::KeyNotInRegion {
key: e.take_key(),
region_id: e.get_region_id(),
start_key: e.take_start_key(),
end_key: e.take_end_key(),
})
}

pub(crate) fn server_is_busy(mut e: kvproto::errorpb::ServerIsBusy) -> Self {
Error::from(ErrorKind::ServerIsBusy {
reason: e.take_reason(),
backoff_ms: e.get_backoff_ms(),
})
}

pub(crate) fn stale_command(message: String) -> Self {
Error::from(ErrorKind::StaleCommand { message })
}

pub(crate) fn store_not_match(e: kvproto::errorpb::StoreNotMatch, message: String) -> Self {
Error::from(ErrorKind::StoreNotMatch {
request_store_id: e.get_request_store_id(),
actual_store_id: e.get_actual_store_id(),
message,
})
}

pub(crate) fn raft_entry_too_large(
e: kvproto::errorpb::RaftEntryTooLarge,
message: String,
) -> Self {
Error::from(ErrorKind::RaftEntryTooLarge {
region_id: e.get_region_id(),
entry_size: e.get_entry_size(),
message,
})
}

pub(crate) fn internal_error(message: impl Into<String>) -> Self {
Error::from(ErrorKind::InternalError {
message: message.into(),
Expand Down
30 changes: 2 additions & 28 deletions src/kv_client/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,8 @@ pub trait HasError: HasRegionError {
}

impl From<errorpb::Error> for Error {
fn from(mut e: errorpb::Error) -> Error {
let message = e.take_message();
if e.has_not_leader() {
let e = e.get_not_leader();
let message = format!("{}. Leader: {:?}", message, e.get_leader());
Error::not_leader(e.get_region_id(), Some(message))
} else if e.has_region_not_found() {
Error::region_not_found(e.get_region_not_found().get_region_id(), Some(message))
} else if e.has_key_not_in_region() {
let e = e.take_key_not_in_region();
Error::key_not_in_region(e)
} else if e.has_epoch_not_match() {
Error::stale_epoch(Some(format!(
"{}. New epoch: {:?}",
message,
e.get_epoch_not_match().get_current_regions()
)))
} else if e.has_server_is_busy() {
Error::server_is_busy(e.take_server_is_busy())
} else if e.has_stale_command() {
Error::stale_command(message)
} else if e.has_store_not_match() {
Error::store_not_match(e.take_store_not_match(), message)
} else if e.has_raft_entry_too_large() {
Error::raft_entry_too_large(e.take_raft_entry_too_large(), message)
} else {
Error::internal_error(message)
}
fn from(e: errorpb::Error) -> Error {
Error::region_error(e)
}
}

Expand Down
10 changes: 1 addition & 9 deletions src/kv_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,7 @@ where
Compat01As03::new(fut.unwrap())
.map(|r| match r {
Err(e) => Err(ErrorKind::Grpc(e).into()),
Ok(mut r) => {
if let Some(e) = r.region_error() {
Err(e)
} else if let Some(e) = r.error() {
Err(e)
} else {
Ok(r)
}
}
Ok(r) => Ok(r),
})
.map(move |r| context.done(r))
}
Expand Down
5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#![type_length_limit = "16777216"]
#![allow(clippy::redundant_closure)]
#![allow(clippy::type_complexity)]
#![feature(async_await)]
#![cfg_attr(test, feature(specialization))]

//! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a
Expand Down Expand Up @@ -56,7 +55,6 @@
//! ([raw](raw::Client), [transactional](transaction::Client)).
//!
//! ```rust
//! # #![feature(async_await)]
//! # use tikv_client::*;
//! # use futures::prelude::*;
//!
Expand All @@ -79,6 +77,8 @@

#[macro_use]
mod util;
#[macro_use]
pub mod transaction;

mod compat;
mod config;
Expand All @@ -90,7 +90,6 @@ pub mod raw;
mod request;
mod security;
mod stats;
pub mod transaction;

#[cfg(test)]
mod mock;
Expand Down
28 changes: 21 additions & 7 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
use crate::{
kv_client::{KvClient, KvConnect, Store},
pd::{PdClient, PdRpcClient, Region, RegionId, RetryClient},
request::KvRequest,
request::{DispatchHook, KvRequest},
transaction::Timestamp,
Config, Error, Key, Result,
};

use futures::future::ready;
use futures::future::BoxFuture;
use fail::fail_point;
use futures::future::{ready, BoxFuture, FutureExt};
use grpcio::CallOption;
use kvproto::metapb;
use kvproto::{errorpb, kvrpcpb, metapb};
use std::{sync::Arc, time::Duration};

/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
Expand Down Expand Up @@ -68,7 +68,7 @@ impl KvConnect for MockKvConnect {
}

impl MockPdClient {
fn region1() -> Region {
pub fn region1() -> Region {
let mut region = Region::default();
region.region.id = 1;
region.region.set_start_key(vec![0]);
Expand All @@ -81,7 +81,7 @@ impl MockPdClient {
region
}

fn region2() -> Region {
pub fn region2() -> Region {
let mut region = Region::default();
region.region.id = 2;
region.region.set_start_key(vec![10]);
Expand Down Expand Up @@ -126,7 +126,7 @@ impl PdClient for MockPdClient {
let result = match id {
1 => Ok(Self::region1()),
2 => Ok(Self::region2()),
_ => Err(Error::region_not_found(id, None)),
_ => Err(Error::region_not_found(id)),
};

Box::pin(ready(result))
Expand All @@ -136,3 +136,17 @@ impl PdClient for MockPdClient {
unimplemented!()
}
}

impl DispatchHook for kvrpcpb::ResolveLockRequest {
fn dispatch_hook(
&self,
_opt: CallOption,
) -> Option<BoxFuture<'static, Result<kvrpcpb::ResolveLockResponse>>> {
fail_point!("region-error", |_| {
let mut resp = kvrpcpb::ResolveLockResponse::default();
resp.region_error = Some(errorpb::Error::default());
Some(ready(Ok(resp)).boxed())
});
Some(ready(Ok(kvrpcpb::ResolveLockResponse::default())).boxed())
}
}
2 changes: 1 addition & 1 deletion src/pd/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Cluster {
.get_error()
.get_message())));
}
let region = resp.region.ok_or_else(|| Error::region_not_found(id, None));
let region = resp.region.ok_or_else(|| Error::region_not_found(id));
let leader = resp.leader;
future::ready(region.map(move |r| Region::new(r, leader)))
})
Expand Down
4 changes: 2 additions & 2 deletions src/pd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Region {
pub fn context(&self) -> Result<kvrpcpb::Context> {
self.leader
.as_ref()
.ok_or_else(|| Error::not_leader(self.region.get_id(), None))
.ok_or_else(|| Error::leader_not_found(self.region.get_id()))
.map(|l| {
let mut ctx = kvrpcpb::Context::default();
ctx.set_region_id(self.region.get_id());
Expand Down Expand Up @@ -87,7 +87,7 @@ impl Region {
self.leader
.as_ref()
.cloned()
.ok_or_else(|| Error::stale_epoch(None))
.ok_or_else(|| Error::leader_not_found(self.id()))
.map(|s| s.get_store_id())
}
}
Expand Down
Loading