Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to futures 0.3 from 0.1 #41

Merged
merged 4 commits into from May 30, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Next

Migrate to futures 0.3

Signed-off-by: Nick Cameron <nrc@ncameron.org>
  • Loading branch information...
nrc committed May 2, 2019
commit 6353dbcfe391d66714686aafab9a49e593259dfb
@@ -6,7 +6,8 @@ os:
# - windows # TODO: https://github.com/pingcap/kvproto/issues/355
- osx
rust:
- stable
# Requires nightly for now, stable can be re-enabled when 1.36 is stable.
# - stable
- nightly
env:
global:
@@ -27,14 +28,14 @@ addons:
- go

install:
- if [[ $TRAVIS_RUST_VERSION == "stable" && $TRAVIS_OS_NAME == "linux" ]]; then rustup component add rustfmt; fi
- if [[ $TRAVIS_RUST_VERSION == "stable" && $TRAVIS_OS_NAME == "linux" ]]; then rustup component add clippy; fi
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then rustup component add rustfmt; fi
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then rustup component add clippy; fi
- if [[ $TRAVIS_OS_NAME == "windows" ]]; then choco install golang cmake strawberryperl protoc; fi
- if [[ $TRAVIS_OS_NAME == "windows" ]]; then export PATH="$PATH:/c/Go/bin/:/c/Program Files/CMake/bin"; fi

script:
- if [[ $TRAVIS_RUST_VERSION == "stable" && $TRAVIS_OS_NAME == "linux" ]]; then cargo fmt -- --check; fi
- if [[ $TRAVIS_RUST_VERSION == "stable" && $TRAVIS_OS_NAME == "linux" ]]; then cargo clippy -- -D clippy::all; fi
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then cargo fmt -- --check; fi
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then cargo clippy -- -D clippy::all; fi
- cargo test --all -- --nocapture
# For now we only run full integration tests on Linux. Here's why:
# * Docker on OS X is not supported by Travis.
@@ -19,7 +19,7 @@ name = "tikv_client"

[dependencies]
failure = "0.1"
futures = "0.1"
futures-preview = { version = "0.3.0-alpha.15", features = ["compat"] }
fxhash = "0.2"
grpcio = { version = "0.5.0-alpha", features = [ "secure", "prost-codec" ], default-features = false }
lazy_static = "0.2.1"
@@ -40,3 +40,5 @@ features = ["push", "process"]
[dev-dependencies]
clap = "2.32"
tempdir = "0.3"
runtime = "0.3.0-alpha.3"
runtime-tokio = "0.3.0-alpha.3"
@@ -1,15 +1,17 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

#![feature(async_await, await_macro)]

mod common;

use crate::common::parse_args;
use futures::future::Future;
use tikv_client::{raw::Client, Config, Key, KvPair, Result, Value};

const KEY: &str = "TiKV";
const VALUE: &str = "Rust";

fn main() -> Result<()> {
#[runtime::main(runtime_tokio::Tokio)]
async fn main() -> Result<()> {
// You can try running this example by passing your pd endpoints
// (and SSL options if necessary) through command line arguments.
let args = parse_args("raw");
@@ -25,15 +27,14 @@ fn main() -> Result<()> {
// When we first create a client we receive a `Connect` structure which must be resolved before
// the client is actually connected and usable.
let unconnnected_client = Client::new(config);
let client = unconnnected_client.wait()?;
let client = await!(unconnnected_client)?;

// Requests are created from the connected client. These calls return structures which
// implement `Future`. This means the `Future` must be resolved before the action ever takes
// place.
//
// Here we set the key `TiKV` to have the value `Rust` associated with it.
let put_request = client.put(KEY, VALUE);
put_request.wait()?; // Returns a `tikv_client::Error` on failure.
await!(client.put(KEY, VALUE)).unwrap(); // Returns a `tikv_client::Error` on failure.
println!("Put key {:?}, value {:?}.", KEY, VALUE);

// Unlike a standard Rust HashMap all calls take owned values. This is because under the hood
@@ -45,20 +46,17 @@ fn main() -> Result<()> {
//
// It is best to pass a `Vec<u8>` in terms of explictness and speed. `String`s and a few other
// types are supported as well, but it all ends up as `Vec<u8>` in the end.
let value: Option<Value> = client.get(KEY).wait()?;
let value: Option<Value> = await!(client.get(KEY))?;
assert_eq!(value, Some(Value::from(VALUE)));
println!("Get key {:?} returned value {:?}.", Key::from(KEY), value);

// You can also set the `ColumnFamily` used by the request.
// This is *advanced usage* and should have some special considerations.
client.delete(KEY).wait().expect("Could not delete value");
await!(client.delete(KEY)).expect("Could not delete value");
println!("Key: {:?} deleted", Key::from(KEY));

// Here we check if the key has been deleted from the key-value store.
let value: Option<Value> = client
.get(KEY)
.wait()
.expect("Could not get just deleted entry");
let value: Option<Value> = await!(client.get(KEY)).expect("Could not get just deleted entry");
assert!(value.is_none());

// You can ask to write multiple key-values at the same time, it is much more
@@ -68,25 +66,18 @@ fn main() -> Result<()> {
KvPair::from(("k2", "v2")),
KvPair::from(("k3", "v3")),
];
client.batch_put(pairs).wait().expect("Could not put pairs");
await!(client.batch_put(pairs)).expect("Could not put pairs");

// Same thing when you want to retrieve multiple values.
let keys = vec![Key::from("k1"), Key::from("k2")];
let values = client
.batch_get(keys.clone())
.wait()
.expect("Could not get values");
let values = await!(client.batch_get(keys.clone())).expect("Could not get values");
println!("Found values: {:?} for keys: {:?}", values, keys);

// Scanning a range of keys is also possible giving it two bounds
// it will returns all entries between these two.
let start = "k1";
let end = "k2";
let pairs = client
.scan(start..=end, 10)
.key_only()
.wait()
.expect("Could not scan");
let pairs = await!(client.scan(start..=end, 10).key_only()).expect("Could not scan");

let keys: Vec<_> = pairs.into_iter().map(|p| p.key().clone()).collect();
assert_eq!(&keys, &[Key::from("k1"), Key::from("k2")]);
@@ -1,66 +1,72 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

#![feature(async_await, await_macro)]

mod common;

use crate::common::parse_args;
use futures::{future, Future, Stream};
use futures::{
future,
prelude::{StreamExt, TryStreamExt},
stream, TryFutureExt,
};
use std::ops::RangeBounds;
use tikv_client::{
transaction::{Client, IsolationLevel},
Config, Key, KvPair, Value,
};

fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
let mut txn = client.begin();
let _: Vec<()> = future::join_all(
await!(future::join_all(
pairs
.into_iter()
.map(Into::into)
.map(|p| txn.set(p.key().clone(), p.value().clone())),
)
.wait()
.map(|p| txn.set(p.key().clone(), p.value().clone()))
))
.into_iter()
.collect::<Result<Vec<()>, _>>()
.expect("Could not set key value pairs");
txn.commit().wait().expect("Could not commit transaction");
await!(txn.commit()).expect("Could not commit transaction");
}

fn get(client: &Client, key: Key) -> Value {
async fn get(client: &Client, key: Key) -> Value {
let txn = client.begin();
txn.get(key).wait().expect("Could not get value")
await!(txn.get(key)).expect("Could not get value")
}

fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
client
// Ignore a spurious warning from rustc (https://github.com/rust-lang/rust/issues/60566).
#[allow(unused_mut)]
async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
await!(client
.begin()
.scan(range)
.take_while(move |_| {
Ok(if limit == 0 {
.into_stream()
.take_while(move |r| {
assert!(r.is_ok(), "Could not scan keys");
future::ready(if limit == 0 {
false
} else {
limit -= 1;
true
})
})
.for_each(|pair| {
println!("{:?}", pair);
Ok(())
})
.wait()
.expect("Could not scan keys");
.for_each(|pair| { future::ready(println!("{:?}", pair)) }));
}

fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
let mut txn = client.begin();
txn.set_isolation_level(IsolationLevel::ReadCommitted);
let _: Vec<()> = keys
.into_iter()
.map(|p| {
txn.delete(p).wait().expect("Could not delete key");
})
.collect();
txn.commit().wait().expect("Could not commit transaction");
let _: Vec<()> = await!(stream::iter(keys.into_iter())
.then(|p| txn
.delete(p)
.unwrap_or_else(|e| panic!("error in delete: {:?}", e)))
.collect());
await!(txn.commit()).expect("Could not commit transaction");
}

fn main() {
#[runtime::main(runtime_tokio::Tokio)]
async fn main() {
// You can try running this example by passing your pd endpoints
// (and SSL options if necessary) through command line arguments.
let args = parse_args("txn");
@@ -73,28 +79,26 @@ fn main() {
Config::new(args.pd)
};

let txn = Client::new(config)
.wait()
.expect("Could not connect to tikv");
let txn = await!(Client::new(config)).expect("Could not connect to tikv");

// set
let key1: Key = b"key1".to_vec().into();
let value1: Value = b"value1".to_vec().into();
let key2: Key = b"key2".to_vec().into();
let value2: Value = b"value2".to_vec().into();
puts(&txn, vec![(key1, value1), (key2, value2)]);
await!(puts(&txn, vec![(key1, value1), (key2, value2)]));

// get
let key1: Key = b"key1".to_vec().into();
let value1 = get(&txn, key1.clone());
let value1 = await!(get(&txn, key1.clone()));
println!("{:?}", (key1, value1));

// scan
let key1: Key = b"key1".to_vec().into();
scan(&txn, key1.., 10);
await!(scan(&txn, key1.., 10));

// delete
let key1: Key = b"key1".to_vec().into();
let key2: Key = b"key2".to_vec().into();
dels(&txn, vec![key1, key2]);
await!(dels(&txn, vec![key1, key2]));
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.