Navigation Menu

Skip to content

Commit

Permalink
batchput set ttl to every key (#10891)
Browse files Browse the repository at this point in the history
* rebase code to make other's commit history be eliminated

Signed-off-by: tangjk <tangjiankun1226@gmail.com>

* eliminate idx loop by zipping iter of ttls to kvs in atomic store

Signed-off-by: tangjk <tangjiankun1226@gmail.com>

* modify enum name make it's meaning clearer

Signed-off-by: tangjk <tangjiankun1226@gmail.com>

* remove nouseful code

Signed-off-by: tangjk <tangjiankun1226@gmail.com>

* make enum more readable

Signed-off-by: tangjk <tangjiankun1226@gmail.com>

* 1. add units test for batch_put ttls 2. using enable_ttl in atomic_store instead of option<ttl>

Signed-off-by: tangjk <tangjiankun1226@gmail.com>

* just declare local varibale when it is used

Signed-off-by: tangjk <tangjiankun1226@gmail.com>

Co-authored-by: Andy Lok <andylokandy@hotmail.com>
  • Loading branch information
daimashusheng and andylokandy committed Sep 23, 2021
1 parent ea1499f commit f983827
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 66 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion components/error_code/src/sst_importer.rs
Expand Up @@ -18,5 +18,6 @@ define_error_codes!(
WRONG_KEY_PREFIX => ("WrongKeyPrefix", "", ""),
BAD_FORMAT => ("BadFormat", "", ""),
FILE_CONFLICT => ("FileConflict", "", ""),
TTL_NOT_ENABLED => ("TTLNotEnabled", "", "")
TTL_NOT_ENABLED => ("TTLNotEnabled", "", ""),
TTLS_LEN_NOT_EQUALS_TO_PAIRS => ("TTLsLenNotEqualsToPairs", "", "")
);
1 change: 1 addition & 0 deletions components/error_code/src/storage.rs
Expand Up @@ -12,6 +12,7 @@ define_error_codes!(
KEY_TOO_LARGE => ("KeyTooLarge", "", ""),
INVALID_CF => ("InvalidCF", "", ""),
TTL_NOT_ENABLED => ("TTLNotEnabled", "", ""),
TTLS_LEN_NOT_EQUALS_TO_PAIRS => ("TTLsLenNotEqualsToPairs", "", ""),
PROTOBUF => ("Protobuf", "", ""),
INVALID_TXN_TSO => ("INVALIDTXNTSO", "", ""),
INVALID_REQ_RANGE => ("InvalidReqRange", "", ""),
Expand Down
6 changes: 6 additions & 0 deletions components/sst_importer/src/errors.rs
Expand Up @@ -111,6 +111,9 @@ pub enum Error {

#[error("ttl is not enabled")]
TTLNotEnabled,

#[error("The length of ttls does not equal to the length of pairs")]
TTLsLenNotEqualsToPairs,
}

impl From<String> for Error {
Expand Down Expand Up @@ -153,6 +156,9 @@ impl ErrorCodeExt for Error {
Error::CodecError(e) => e.error_code(),
Error::FileConflict => error_code::sst_importer::FILE_CONFLICT,
Error::TTLNotEnabled => error_code::sst_importer::TTL_NOT_ENABLED,
Error::TTLsLenNotEqualsToPairs => {
error_code::sst_importer::TTLS_LEN_NOT_EQUALS_TO_PAIRS
}
}
}
}
4 changes: 2 additions & 2 deletions components/test_storage/src/sync_storage.rs
Expand Up @@ -371,9 +371,9 @@ impl<E: Engine> SyncTestStorage<E> {
ctx: Context,
cf: String,
pairs: Vec<KvPair>,
ttl: u64,
ttls: Vec<u64>,
) -> Result<()> {
wait_op!(|cb| self.store.raw_batch_put_atomic(ctx, cf, pairs, ttl, cb)).unwrap()
wait_op!(|cb| self.store.raw_batch_put_atomic(ctx, cf, pairs, ttls, cb)).unwrap()
}

pub fn raw_batch_delete_atomic(
Expand Down
4 changes: 2 additions & 2 deletions components/txn_types/src/lib.rs
Expand Up @@ -16,8 +16,8 @@ use std::io;
pub use lock::{Lock, LockType};
pub use timestamp::{TimeStamp, TsSet};
pub use types::{
is_short_value, Key, KvPair, Mutation, MutationType, OldValue, OldValues, TxnExtra,
TxnExtraScheduler, Value, WriteBatchFlags, SHORT_VALUE_MAX_LEN,
is_short_value, Key, KvPair, Mutation, MutationType, OldValue, OldValues, RawMutation,
TxnExtra, TxnExtraScheduler, Value, WriteBatchFlags, SHORT_VALUE_MAX_LEN,
};
pub use write::{Write, WriteRef, WriteType};

Expand Down
22 changes: 22 additions & 0 deletions components/txn_types/src/types.rs
Expand Up @@ -253,6 +253,28 @@ pub enum MutationType {
Other,
}

/// A row mutation.
#[derive(Debug, Clone)]
pub enum RawMutation {
/// Put `Value` into `Key` with TTL. The TTL will overwrite the existing TTL value.
Put { key: Key, value: Value, ttl: u64 },
/// Delete `Key`.
Delete { key: Key },
}

impl RawMutation {
pub fn key(&self) -> &Key {
match self {
RawMutation::Put {
ref key,
value: _,
ttl: _,
} => key,
RawMutation::Delete { ref key } => key,
}
}
}

/// A row mutation.
#[derive(Debug, Clone)]
pub enum Mutation {
Expand Down
4 changes: 2 additions & 2 deletions src/coprocessor_v2/raw_storage_impl.rs
Expand Up @@ -97,10 +97,10 @@ impl<E: Engine, L: LockManager> RawStorage for RawStorageImpl<'_, E, L> {
async fn batch_put(&self, kv_pairs: Vec<KvPair>) -> PluginResult<()> {
let ctx = self.context.clone();
let cf = engine_traits::CF_DEFAULT.to_string();
let ttl = 0; // unlimited
let ttls = vec![0; kv_pairs.len()]; // unlimited
let (cb, f) = paired_future_callback();

let res = self.storage.raw_batch_put(ctx, cf, kv_pairs, ttl, cb);
let res = self.storage.raw_batch_put(ctx, cf, kv_pairs, ttls, cb);

match res {
Err(e) => Err(e),
Expand Down
19 changes: 16 additions & 3 deletions src/server/service/kv.rs
Expand Up @@ -1524,7 +1524,7 @@ fn future_raw_put<E: Engine, L: LockManager>(
req.take_context(),
req.take_cf(),
vec![(req.take_key(), req.take_value())],
req.get_ttl(),
vec![req.get_ttl()],
cb,
)
} else {
Expand Down Expand Up @@ -1558,6 +1558,19 @@ fn future_raw_batch_put<E: Engine, L: LockManager>(
mut req: RawBatchPutRequest,
) -> impl Future<Output = ServerResult<RawBatchPutResponse>> {
let cf = req.take_cf();
let pairs_len = req.get_pairs().len();
// The TTL for each key in seconds.
//
// In some TiKV of old versions, only one TTL can be provided and the TTL will be applied to all keys in
// the request. For compatibility reasons, if the length of `ttls` is exactly one, then the TTL will be applied
// to all keys. Otherwise, the length mismatch between `ttls` and `pairs` will return an error.
let ttls = if req.get_ttls().is_empty() {
vec![0; pairs_len]
} else if req.get_ttls().len() == 1 {
vec![req.get_ttls()[0]; pairs_len]
} else {
req.take_ttls()
};
let pairs = req
.take_pairs()
.into_iter()
Expand All @@ -1567,9 +1580,9 @@ fn future_raw_batch_put<E: Engine, L: LockManager>(
let (cb, f) = paired_future_callback();
let for_atomic = req.get_for_cas();
let res = if for_atomic {
storage.raw_batch_put_atomic(req.take_context(), cf, pairs, req.get_ttl(), cb)
storage.raw_batch_put_atomic(req.take_context(), cf, pairs, ttls, cb)
} else {
storage.raw_batch_put(req.take_context(), cf, pairs, req.get_ttl(), cb)
storage.raw_batch_put(req.take_context(), cf, pairs, ttls, cb)
};

async move {
Expand Down
6 changes: 6 additions & 0 deletions src/storage/errors.rs
Expand Up @@ -55,6 +55,9 @@ pub enum ErrorInner {

#[error("Deadline is exceeded")]
DeadlineExceeded,

#[error("The length of ttls does not equal to the length of pairs")]
TTLsLenNotEqualsToPairs,
}

impl From<DeadlineError> for ErrorInner {
Expand Down Expand Up @@ -97,6 +100,9 @@ impl ErrorCodeExt for Error {
ErrorInner::InvalidCf(_) => error_code::storage::INVALID_CF,
ErrorInner::TTLNotEnabled => error_code::storage::TTL_NOT_ENABLED,
ErrorInner::DeadlineExceeded => error_code::storage::DEADLINE_EXCEEDED,
ErrorInner::TTLsLenNotEqualsToPairs => {
error_code::storage::TTLS_LEN_NOT_EQUALS_TO_PAIRS
}
}
}
}
Expand Down

0 comments on commit f983827

Please sign in to comment.