Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More engine abstraction, work toward Peekable and Iterable for Snapshot #5901

Merged
merged 51 commits into from Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
b1c31b1
engine_*: Add IOLimiter, IOLimiterExt traits
brson Nov 4, 2019
63b6aa7
engine_traits: Define LimitReader and LimitWriter
brson Nov 4, 2019
67bb896
engine_rocks: Add limiter tests
brson Nov 4, 2019
fe73763
sst_importer: Use limiter from engine_traits
brson Nov 4, 2019
1bbfbfa
sst_importer: Remove rocks dependency
brson Nov 4, 2019
b22f3c4
tikv: Use IOLimiter from engine_traits
brson Nov 4, 2019
1aa78cd
tikv: Use IOLimiter from engine_traits
brson Nov 4, 2019
16d02f3
engine: remove IOLimiter, LimitReader, LimitWriter
brson Nov 4, 2019
a1aaf0c
tikv: Use engine_traits for IOLimiter
brson Nov 5, 2019
2f43c58
backup: use IOLimiter from engine_traits
brson Nov 5, 2019
f5f7837
*: rustfmt
brson Nov 5, 2019
9317187
Merge remote-tracking branch 'origin/master' into engine-traits-limiter
brson Nov 11, 2019
743ff9a
engine_*: use i64 for rate limiter bytes_per_sec
brson Nov 11, 2019
9c677c2
Merge branch 'master' into engine-traits-limiter
kennytm Nov 12, 2019
cb0f2f0
Merge branch 'master' into engine-traits-limiter
brson Nov 14, 2019
36c6855
engine_rocks: Add extension method for creating RocksEngine references
brson Nov 5, 2019
7515f1c
engine_rocks: Rename Snapshot to RocksSnapshot for consistency
brson Nov 11, 2019
efb52a0
engine_rocks: Add conversion from &RawSnapshot to &RocksSnapshot
brson Nov 11, 2019
b9bfa1a
engine_rocks: Add static_assertions dep
brson Nov 11, 2019
457bab5
engine_rocks: Assert some properties of RocksSnapshot
brson Nov 11, 2019
9450572
engine: Add engine_traits dep
brson Nov 11, 2019
f25b541
engine_traits: Move SeekMode into options.rs
brson Nov 11, 2019
1cdcc7d
engine: Move IterOption into engine_traits
brson Nov 11, 2019
4d5f8a3
engine_traits: Add Iterable bound to Snapshot
brson Nov 11, 2019
15de487
engine_rocks: Add Compat implementation for snapshots
brson Nov 12, 2019
6fe421c
engine_traits: Add From<&[u8]> for SeekKey
brson Nov 12, 2019
f46bdf4
engine_traits: Remove Result return from key/value methods
brson Nov 13, 2019
8b6d955
engine_traits: Take IterOptions by value
brson Nov 13, 2019
6f309ca
engine: Migrate Snapshot::db_iterator to engine_traits
brson Nov 13, 2019
34f46d8
engine: Remove unused db_iterator_cf method
brson Nov 13, 2019
06f2633
engine_traits: Reorder arguments to iterator_cf_opt
brson Nov 13, 2019
5c3e33c
engine_traits: Rename some Peekable methods for consistency
brson Nov 13, 2019
b7369ed
engine_rocks add Compat for SyncSnapshot
brson Nov 13, 2019
c8f7c52
engine_traits: Add DBVector
brson Nov 13, 2019
b97931b
engine_traits: Modify Peekable to use DBVector
brson Nov 13, 2019
34acfff
tikv: Generalize IntoProtoError to IntoOther
brson Nov 13, 2019
df2c45d
tikv: Define IntoOther<raft::Error> for engine_traits::Error
brson Nov 14, 2019
5e7d308
engine_traits: Add more crate documentation
brson Nov 14, 2019
f616eeb
*: rustfmt
brson Nov 14, 2019
70f8844
Merge branch 'master' into engine-traits-limiter
brson Nov 18, 2019
ec6346b
Merge branch 'master' into engine-traits-snapshot
brson Nov 18, 2019
9be001d
Merge remote-tracking branch 'origin/master' into engine-traits-limiter
brson Nov 19, 2019
148c559
*: rustfmt
brson Nov 19, 2019
7cd0a4d
Merge remote-tracking branch 'brson/engine-traits-limiter' into engin…
brson Nov 19, 2019
ae0bc5d
tikv: clippy
brson Nov 19, 2019
ce7005e
engine_traits: Change titan_key_only to key_only
brson Nov 19, 2019
76dff85
tikv: rustfmt
brson Nov 19, 2019
7ca2c30
Merge branch 'master' into engine-traits-snapshot
brson Nov 20, 2019
ff7e21f
Merge remote-tracking branch 'origin/master' into engine-traits-snapshot
brson Nov 21, 2019
14e8e5d
engine_rocks: Fix repr of SyncSnapshot
brson Nov 21, 2019
576573e
Merge branch 'master' into engine-traits-snapshot
zhangjinpeng87 Nov 22, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion cmd/src/server.rs
Expand Up @@ -12,6 +12,7 @@ use kvproto::deadlock::create_deadlock;
use kvproto::debugpb::create_debug;
use kvproto::import_sstpb::create_import_sst;
use pd_client::{PdClient, RpcClient};
use std::convert::TryFrom;
use std::fs::File;
use std::path::Path;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -205,9 +206,12 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
)
.unwrap_or_else(|e| fatal!("failed to create raft storage: {}", e));

let bps = i64::try_from(cfg.server.snap_max_write_bytes_per_sec.0)
.unwrap_or_else(|_| fatal!("snap_max_write_bytes_per_sec > i64::max_value"));

// Create snapshot manager, server.
let snap_mgr = SnapManagerBuilder::default()
.max_write_bytes_per_sec(cfg.server.snap_max_write_bytes_per_sec.0)
.max_write_bytes_per_sec(bps)
.max_total_size(cfg.server.snap_max_total_size.0)
.build(
snap_path.as_path().to_str().unwrap().to_owned(),
Expand Down
9 changes: 5 additions & 4 deletions components/backup/src/endpoint.rs
Expand Up @@ -7,8 +7,9 @@ use std::sync::atomic::*;
use std::sync::*;
use std::time::*;

use engine::rocks::util::io_limiter::IOLimiter;
use engine::DB;
use engine_rocks::RocksIOLimiter;
use engine_traits::IOLimiter;
use external_storage::*;
use futures::lazy;
use futures::prelude::Future;
Expand Down Expand Up @@ -64,7 +65,7 @@ impl fmt::Debug for Task {

#[derive(Clone)]
struct LimitedStorage {
limiter: Option<Arc<IOLimiter>>,
limiter: Option<Arc<RocksIOLimiter>>,
storage: Arc<dyn ExternalStorage>,
}

Expand All @@ -77,7 +78,7 @@ impl Task {
let cancel = Arc::new(AtomicBool::new(false));

let limiter = if req.get_rate_limit() != 0 {
Some(Arc::new(IOLimiter::new(req.get_rate_limit() as _)))
Some(Arc::new(RocksIOLimiter::new(req.get_rate_limit() as _)))
} else {
None
};
Expand Down Expand Up @@ -839,7 +840,7 @@ pub mod tests {
}

// TODO: check key number for each snapshot.
let limiter = Arc::new(IOLimiter::new(10 * 1024 * 1024 /* 10 MB/s */));
let limiter = Arc::new(RocksIOLimiter::new(10 * 1024 * 1024 /* 10 MB/s */));
for (ts, len) in backup_tss {
let mut req = BackupRequest::new();
req.set_start_key(vec![]);
Expand Down
13 changes: 9 additions & 4 deletions components/backup/src/writer.rs
Expand Up @@ -3,9 +3,10 @@
use std::sync::Arc;
use std::time::Instant;

use engine::rocks::util::io_limiter::{IOLimiter, LimitReader};
use engine::{CF_DEFAULT, CF_WRITE, DB};
use engine_rocks::RocksIOLimiter;
use engine_rocks::{RocksEngine, RocksSstWriter, RocksSstWriterBuilder};
use engine_traits::LimitReader;
use engine_traits::{SstWriter, SstWriterBuilder};
use external_storage::ExternalStorage;
use kvproto::backup::File;
Expand Down Expand Up @@ -60,7 +61,7 @@ impl Writer {
name: &str,
cf: &'static str,
buf: &mut Vec<u8>,
limiter: Option<Arc<IOLimiter>>,
limiter: Option<Arc<RocksIOLimiter>>,
storage: &dyn ExternalStorage,
) -> Result<File> {
buf.reserve(self.writer.file_size() as _);
Expand Down Expand Up @@ -93,12 +94,16 @@ pub struct BackupWriter {
name: String,
default: Writer,
write: Writer,
limiter: Option<Arc<IOLimiter>>,
limiter: Option<Arc<RocksIOLimiter>>,
}

impl BackupWriter {
/// Create a new BackupWriter.
pub fn new(db: Arc<DB>, name: &str, limiter: Option<Arc<IOLimiter>>) -> Result<BackupWriter> {
pub fn new(
db: Arc<DB>,
name: &str,
limiter: Option<Arc<RocksIOLimiter>>,
) -> Result<BackupWriter> {
let default = RocksSstWriterBuilder::new()
.set_in_memory(true)
.set_cf(CF_DEFAULT)
Expand Down
1 change: 1 addition & 0 deletions components/engine/Cargo.toml
Expand Up @@ -26,6 +26,7 @@ serde_derive = "1.0"
toml = "0.4"
hex = "0.3"
tikv_util = { path = "../tikv_util" }
engine_traits = { path = "../engine_traits" }

[dependencies.prometheus]
git = "https://github.com/pingcap/rust-prometheus.git"
Expand Down
133 changes: 14 additions & 119 deletions components/engine/src/iterable.rs
Expand Up @@ -5,141 +5,36 @@ pub use crate::rocks::{DBIterator, ReadOptions, DB};
use crate::Result;
use tikv_util::keybuilder::KeyBuilder;

#[derive(Clone, PartialEq)]
enum SeekMode {
TotalOrder,
Prefix,
}
pub use engine_traits::IterOptions as IterOption;
pub use engine_traits::SeekMode;

pub struct IterOption {
lower_bound: Option<KeyBuilder>,
upper_bound: Option<KeyBuilder>,
prefix_same_as_start: bool,
fill_cache: bool,
// only supported when Titan enabled, otherwise it doesn't take effect.
titan_key_only: bool,
seek_mode: SeekMode,
pub trait IterOptionsExt {
fn build_read_opts(self) -> ReadOptions;
}

impl IterOption {
pub fn new(
lower_bound: Option<KeyBuilder>,
upper_bound: Option<KeyBuilder>,
fill_cache: bool,
) -> IterOption {
IterOption {
lower_bound,
upper_bound,
prefix_same_as_start: false,
fill_cache,
titan_key_only: false,
seek_mode: SeekMode::TotalOrder,
}
}

#[inline]
pub fn use_prefix_seek(mut self) -> IterOption {
self.seek_mode = SeekMode::Prefix;
self
}

#[inline]
pub fn total_order_seek_used(&self) -> bool {
self.seek_mode == SeekMode::TotalOrder
}

#[inline]
pub fn fill_cache(&mut self, v: bool) {
self.fill_cache = v;
}

#[inline]
pub fn titan_key_only(&mut self, v: bool) {
self.titan_key_only = v;
}

#[inline]
pub fn lower_bound(&self) -> Option<&[u8]> {
self.lower_bound.as_ref().map(|v| v.as_slice())
}

#[inline]
pub fn set_lower_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) {
let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0);
self.lower_bound = Some(builder);
}

pub fn set_vec_lower_bound(&mut self, bound: Vec<u8>) {
self.lower_bound = Some(KeyBuilder::from_vec(bound, 0, 0));
}

pub fn set_lower_bound_prefix(&mut self, prefix: &[u8]) {
if let Some(ref mut builder) = self.lower_bound {
builder.set_prefix(prefix);
}
}

#[inline]
pub fn upper_bound(&self) -> Option<&[u8]> {
self.upper_bound.as_ref().map(|v| v.as_slice())
}

#[inline]
pub fn set_upper_bound(&mut self, bound: &[u8], reserved_prefix_len: usize) {
let builder = KeyBuilder::from_slice(bound, reserved_prefix_len, 0);
self.upper_bound = Some(builder);
}

pub fn set_vec_upper_bound(&mut self, bound: Vec<u8>) {
self.upper_bound = Some(KeyBuilder::from_vec(bound, 0, 0));
}

pub fn set_upper_bound_prefix(&mut self, prefix: &[u8]) {
if let Some(ref mut builder) = self.upper_bound {
builder.set_prefix(prefix);
}
}

#[inline]
pub fn set_prefix_same_as_start(mut self, enable: bool) -> IterOption {
self.prefix_same_as_start = enable;
self
}

pub fn build_read_opts(self) -> ReadOptions {
impl IterOptionsExt for IterOption {
fn build_read_opts(self) -> ReadOptions {
let mut opts = ReadOptions::new();
opts.fill_cache(self.fill_cache);
if self.titan_key_only {
opts.fill_cache(self.fill_cache());
if self.titan_key_only() {
opts.set_titan_key_only(true);
}
if self.total_order_seek_used() {
opts.set_total_order_seek(true);
} else if self.prefix_same_as_start {
} else if self.prefix_same_as_start() {
opts.set_prefix_same_as_start(true);
}
if let Some(builder) = self.lower_bound {
opts.set_iterate_lower_bound(builder.build());
let (lower, upper) = self.build_bounds();
if let Some(lower) = lower {
opts.set_iterate_lower_bound(lower);
}
if let Some(builder) = self.upper_bound {
opts.set_iterate_upper_bound(builder.build());
if let Some(upper) = upper {
opts.set_iterate_upper_bound(upper);
}
opts
}
}

impl Default for IterOption {
fn default() -> IterOption {
IterOption {
lower_bound: None,
upper_bound: None,
prefix_same_as_start: false,
fill_cache: true,
titan_key_only: false,
seek_mode: SeekMode::TotalOrder,
}
}
}

// TODO: refactor this trait into rocksdb trait.
pub trait Iterable {
fn new_iterator(&self, iter_opt: IterOption) -> DBIterator<&DB>;
Expand Down
1 change: 1 addition & 0 deletions components/engine/src/rocks/db.rs
Expand Up @@ -3,6 +3,7 @@
use std::option::Option;

use super::{util, DBIterator, DBVector, WriteBatch, DB};
use crate::iterable::IterOptionsExt;
use crate::{IterOption, Iterable, Mutable, Peekable, Result};

impl Peekable for DB {
Expand Down
19 changes: 2 additions & 17 deletions components/engine/src/rocks/snapshot.rs
Expand Up @@ -6,8 +6,10 @@ use std::option::Option;
use std::sync::Arc;

use super::{CFHandle, DBVector, ReadOptions, UnsafeSnap, DB};
use crate::iterable::IterOptionsExt;
use crate::{DBIterator, Error, IterOption, Iterable, Peekable, Result};

#[repr(C)] // Guarantee same representation as in engine_rocks
pub struct Snapshot {
db: Arc<DB>,
snap: UnsafeSnap,
Expand Down Expand Up @@ -43,23 +45,6 @@ impl Snapshot {
pub fn get_db(&self) -> Arc<DB> {
Arc::clone(&self.db)
}

pub fn db_iterator(&self, iter_opt: IterOption) -> DBIterator<Arc<DB>> {
let mut opt = iter_opt.build_read_opts();
unsafe {
opt.set_snapshot(&self.snap);
}
DBIterator::new(Arc::clone(&self.db), opt)
}

pub fn db_iterator_cf(&self, cf: &str, iter_opt: IterOption) -> Result<DBIterator<Arc<DB>>> {
let handle = super::util::get_cf_handle(&self.db, cf)?;
let mut opt = iter_opt.build_read_opts();
unsafe {
opt.set_snapshot(&self.snap);
}
Ok(DBIterator::new_cf(Arc::clone(&self.db), handle, opt))
}
}

impl Debug for Snapshot {
Expand Down