Skip to content

Commit

Permalink
Merge branch 'master' into fix-txn-batch-scan
Browse files Browse the repository at this point in the history
  • Loading branch information
ekexium committed Oct 12, 2020
2 parents 319ec52 + caa869e commit 7c584fb
Show file tree
Hide file tree
Showing 16 changed files with 513 additions and 74 deletions.
104 changes: 93 additions & 11 deletions README.md
Expand Up @@ -8,42 +8,124 @@
This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a
distributed transactional Key-Value database written in Rust.

With this crate you can easily connect to any TiKV deployment, interact with it, and mutate the data it contains.
With this crate you can easily connect to any TiKV deployment, interact with it, and mutate the data it contains. It uses async/await internally and exposes some `async fn` APIs as well.

This is an open source (Apache 2) project hosted by the Cloud Native Computing Foundation (CNCF) and maintained by the TiKV Authors. *We'd love it if you joined us in improving this project.*

## Using the client
## Getting started

The TiKV client is a Rust library (crate). It uses async/await internally and exposes some `async fn` APIs as well.

To use this crate in your project, add it as a dependency in your `Cargo.toml`:
The TiKV client is a Rust library (crate). To use this crate in your project, add following dependencies in your `Cargo.toml`:

```toml
[dependencies]
# ...Your other dependencies...
tikv-client = { git = "https://github.com/tikv/client-rust.git" }

[patch.crates-io]
raft-proto = { git = "https://github.com/tikv/raft-rs", rev = "e624c1d48460940a40d8aa69b5329460d9af87dd" }
```

The client requires a Git dependency until we can [publish it](https://github.com/tikv/client-rust/issues/32).

There are [examples](examples) which show how to use the client in a Rust program.
The client provides two modes to interact with TiKV: raw and transactional.
In the current version (0.0.0), the transactional API only supports optimistic transactions.

Important note: It is **not recommended or supported** to use both the raw and transactional APIs on the same database.

### Code examples

Raw mode:

```rust
let config = Config::new(vec!["127.0.0.1:2379"]);
let client = RawClient::new(config).await?;
client.put("key".to_owned(), "value".to_owned()).await;
let value = client.get("key".to_owned()).await;
```

Transactional mode:

```rust
let config = Config::new(vec!["127.0.0.1:2379"]);
let txn_client = TransactionClient::new(config).await?;
let mut txn = txn_client.begin().await?;
txn.put("key".to_owned(), "value".to_owned()).await?;
let value = txn.get("key".to_owned()).await;
txn.commit().await?;
```

There are some [examples](examples) which show how to use the client in a Rust program.

### API

#### Raw requests

| Request | Main parameter type | Successful result type | Noteworthy Behavior |
| -------------- | ------------------- | ---------------------- | --------------------------------------------- |
| `put` | `KvPair` | `()` | |
| `get` | `Key` | `Option<Value>` | |
| `delete` | `Key` | `()` | |
| `scan` | `BoundRange` | `Vec<KvPair>` | |
| `batch_put` | `Iter<KvPair>` | `()` | |
| `batch_get` | `Iter<Key>` | `Vec<KvPair>` | Skip non-existent keys; Does not retain order |
| `batch_delete` | `Iter<Key>` | `()` | |
| `delete_range` | `BoundRange` | `()` | |

#### Transactional requests

| Request | Main parameter type | Successful result type | Noteworthy Behavior |
| ----------- | ------------------- | ---------------------- | --------------------------------------------- |
| `put` | `KvPair` | `()` | |
| `get` | `Key` | `Option<value>` | |
| `delete` | `Key` | `()` | |
| `scan` | `BoundRange` | `Iter<KvPair>` | |
| `batch_get` | `Iter<Key>` | `Iter<KvPair>` | Skip non-existent keys; Does not retain order |
| `lock_keys` | `KvPair` | `()` | |

For detailed behavior of each request, please refer to the [doc](#Access-the-documentation).

#### Experimental raw requests

You must be careful if you want to use the following request(s). Read the description for reasons.

| Request | Main parameter type | Successful result type |
| -------------- | ------------------- | ---------------------- |
| `batch_scan` | `Iter<BoundRange>` | `Vec<KvPair>` |

The `each_limit` parameter does not work as expected. It does not limit the number of results returned of each range, instead it limits the number of results in each region of each range. As a result, you may get **more than** `each_limit` key-value pairs for each range. But you should not miss any entries.

The results of `batch_scan` are flattened. The order of ranges is retained.

### Useful types

To use the client, there are 4 types you will need.

`Key` is simply a vector of bytes(`Vec<u8>`). `String` and `Vec<u8>` implements `Into<Key>`, so you can directly pass them to clients.

`Value` is just an alias of `Vec<u8>`.

`KvPair` is a tuple consisting of a `Key` and a `Value`. It also provides some convenience methods for conversion to and from other types.

`BoundRange` is used for range related requests like `scan`. It implements `From` for usual ranges so you can just create a range and pass them to the request.For instance, `client.scan("k2".to_owned()..="k5".to_owned(), 5)` or `client.delete_range(vec![]..)`.

## Access the documentation

We recommend using the cargo-generated documentation to browse and understand the API. We've done
our best to include ample, tested, and understandable examples.

You can visit [docs.rs/tikv-client](https://docs.rs/tikv-client/), or build the documentation yourself.

You can access the documentation on your machine by running the following in any project that depends on `tikv-client`.

```bash
cargo doc --package tikv-client --open
# If it didn't work, browse file URL it tried to open with your browser.
```

## Minimal Rust Version
## Known issues

If you use transactional APIs, you'll need to perform GC in TiKV to save storage.
However, current implementation does not provide a direct way to do this. A workaround is described in [#180](https://github.com/tikv/client-rust/issues/180).

## Minimal Rust version

This crate supports Rust 1.40 and above.

For development, a nightly Rust compiler is needed to compile the tests.
For development, a nightly Rust compiler is needed to compile the tests.
6 changes: 1 addition & 5 deletions examples/raw.rs
Expand Up @@ -94,11 +94,7 @@ async fn main() -> Result<()> {
let keys: Vec<_> = pairs.into_iter().map(|p| p.key().clone()).collect();
assert_eq!(
&keys,
&[
Key::from("k1".to_owned()),
Key::from("k2".to_owned()),
Key::from("k3".to_owned())
]
&[Key::from("k1".to_owned()), Key::from("k2".to_owned()),]
);
println!("Scaning from {:?} to {:?} gives: {:?}", start, end, keys);

Expand Down
1 change: 1 addition & 0 deletions src/mock.rs
Expand Up @@ -32,6 +32,7 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
MockCluster,
))
},
false,
)
.await
.unwrap()
Expand Down
66 changes: 58 additions & 8 deletions src/pd/client.rs
@@ -1,10 +1,11 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{pd::RetryClient, Config, Key, Region, RegionId};
use crate::{pd::RetryClient, tikv_client_common::kv::codec, Config, Key, Region, RegionId};
use futures::{
future::{ready, BoxFuture, Either},
prelude::*,
stream::BoxStream,
FutureExt, TryFutureExt,
};
use grpcio::{EnvBuilder, Environment};
use std::{
Expand All @@ -23,20 +24,42 @@ use tikv_client_store::{KvClient, KvConnect, Store, TikvConnect};
const CQ_COUNT: usize = 1;
const CLIENT_PREFIX: &str = "tikv-client";

// The PdClient handles all the encoding stuff.
//
// Raw APIs does not require encoding/decoding at all.
// All keys in all places (client, PD, TiKV) are in the same encoding (here called "raw format").
//
// Transactional APIs are a bit complicated.
// We need encode and decode keys when we communicate with PD, but not with TiKV.
// We encode keys before sending requests to PD, and decode keys in the response from PD.
// That's all we need to do with encoding.
//
// client -encoded-> PD, PD -encoded-> client
// client -raw-> TiKV, TiKV -raw-> client
//
// The reason for the behavior is that in transaction mode, TiKV encode keys for MVCC.
// In raw mode, TiKV doesn't encode them.
// TiKV tells PD using its internal representation, whatever the encoding is.
// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff.
//
pub trait PdClient: Send + Sync + 'static {
type KvClient: KvClient + Send + Sync + 'static;

// In transactional API, `region` is decoded (keys in raw format).
fn map_region_to_store(
self: Arc<Self>,
region: Region,
) -> BoxFuture<'static, Result<Store<Self::KvClient>>>;

// In transactional API, the key and returned region are both decoded (keys in raw format).
fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>>;

// In transactional API, the returned region is decoded (keys in raw format)
fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>>;

fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>>;

// In transactional API, `key` is in raw format
fn store_for_key(
self: Arc<Self>,
key: &Key,
Expand Down Expand Up @@ -80,7 +103,7 @@ pub trait PdClient: Send + Sync + 'static {
.boxed()
}

// Returns a Steam which iterates over the contexts for each region covered by range.
// Returns a Stream which iterates over the contexts for each region covered by range.
fn stores_for_range(
self: Arc<Self>,
range: BoundRange,
Expand All @@ -92,12 +115,15 @@ pub trait PdClient: Send + Sync + 'static {
Some(sk) => sk,
};
let end_key = end_key.clone();

let this = self.clone();
Either::Left(self.region_for_key(&start_key).and_then(move |region| {
let region_end = region.end_key();
this.map_region_to_store(region).map_ok(move |store| {
if end_key.map(|x| x <= region_end).unwrap_or(false) || region_end.is_empty() {
if end_key
.map(|x| x <= region_end && !x.is_empty())
.unwrap_or(false)
|| region_end.is_empty()
{
return Some((None, store));
}
Some((Some(region_end), store))
Expand Down Expand Up @@ -158,6 +184,14 @@ pub trait PdClient: Send + Sync + 'static {
})
.boxed()
}

fn decode_region(mut region: Region, enable_codec: bool) -> Result<Region> {
if enable_codec {
codec::decode_bytes_in_place(&mut region.region.mut_start_key(), false)?;
codec::decode_bytes_in_place(&mut region.region.mut_end_key(), false)?;
}
Ok(region)
}
}

/// This client converts requests for the logical TiKV cluster into requests
Expand All @@ -167,6 +201,7 @@ pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
timeout: Duration,
enable_codec: bool,
}

impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
Expand All @@ -191,12 +226,24 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
}

fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>> {
let key: &[u8] = key.into();
self.pd.clone().get_region(key.to_owned()).boxed()
let enable_codec = self.enable_codec;
let key = if enable_codec {
key.to_encoded().into()
} else {
key.clone().into()
};
let region = self.pd.clone().get_region(key).boxed();
region
.ok_and_then(move |region| Self::decode_region(region, enable_codec))
.boxed()
}

fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>> {
self.pd.clone().get_region_by_id(id).boxed()
let region = self.pd.clone().get_region_by_id(id).boxed();
let enable_codec = self.enable_codec;
region
.ok_and_then(move |region| Self::decode_region(region, enable_codec))
.boxed()
}

fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
Expand All @@ -205,13 +252,14 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
}

impl PdRpcClient<TikvConnect, Cluster> {
pub async fn connect(config: &Config) -> Result<PdRpcClient> {
pub async fn connect(config: &Config, enable_codec: bool) -> Result<PdRpcClient> {
PdRpcClient::new(
config,
|env, security_mgr| TikvConnect::new(env, security_mgr),
|env, security_mgr| {
RetryClient::connect(env, &config.pd_endpoints, security_mgr, config.timeout)
},
enable_codec,
)
.await
}
Expand All @@ -222,6 +270,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
config: &Config,
kv_connect: MakeKvC,
pd: MakePd,
enable_codec: bool,
) -> Result<PdRpcClient<KvC, Cl>>
where
PdFut: Future<Output = Result<RetryClient<Cl>>>,
Expand Down Expand Up @@ -251,6 +300,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
kv_client_cache,
kv_connect: kv_connect(env, security_mgr),
timeout: config.timeout,
enable_codec,
})
}

Expand Down
1 change: 1 addition & 0 deletions src/pd/retry.rs
Expand Up @@ -96,6 +96,7 @@ impl RetryClient<Cluster> {
}

// These get_* functions will try multiple times to make a request, reconnecting as necessary.
// It does not know about encoding. Caller should take care of it.
pub async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<Region> {
retry!(self, "get_region", |cluster| {
let key = key.clone();
Expand Down
7 changes: 5 additions & 2 deletions src/raw/client.rs
Expand Up @@ -26,7 +26,7 @@ impl Client {
/// # });
/// ```
pub async fn new(config: Config) -> Result<Client> {
let rpc = Arc::new(PdRpcClient::connect(&config).await?);
let rpc = Arc::new(PdRpcClient::connect(&config, false).await?);
Ok(Client {
rpc,
cf: None,
Expand Down Expand Up @@ -103,7 +103,8 @@ impl Client {
/// Create a new 'batch get' request.
///
/// Once resolved this request will result in the fetching of the values associated with the
/// given keys, skipping non-existent entries.
/// given keys
/// Non-existent entries will be skipped. The order of the keys is not retained.
///
/// ```rust,no_run
/// # use tikv_client::{KvPair, Config, RawClient};
Expand Down Expand Up @@ -173,6 +174,7 @@ impl Client {
/// Create a new 'delete' request.
///
/// Once resolved this request will result in the deletion of the given key.
/// It does not return an error if the key does not exist.
///
/// ```rust,no_run
/// # use tikv_client::{Key, Config, RawClient};
Expand All @@ -193,6 +195,7 @@ impl Client {
/// Create a new 'batch delete' request.
///
/// Once resolved this request will result in the deletion of the given keys.
/// It does not return an error if some of the keys do not exist and will delete the others.
///
/// ```rust,no_run
/// # use tikv_client::{Config, RawClient};
Expand Down

0 comments on commit 7c584fb

Please sign in to comment.