Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

debug: add scan_mvcc support. #2335

Merged
merged 37 commits into from Oct 12, 2017
Merged

debug: add scan_mvcc support. #2335

merged 37 commits into from Oct 12, 2017

Conversation

hicqu
Copy link
Contributor

@hicqu hicqu commented Sep 25, 2017

This PR add support for debug ScanMvcc API.

@hicqu
Copy link
Contributor Author

hicqu commented Sep 25, 2017

@overvenus , PTAL, thanks.

Ok(_) => Ok(dealer.0.unwrap()),
Err(e) => Err((dealer.0.unwrap(), e)),
};
future::result(result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think result is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pool.spawn_fn requires a closure returning Future, so I believe we need future::result(...).

fn mvcc_kvs(&self, cf: CfName, from: Vec<u8>, to: Vec<u8>) -> Receiver<Kv> {
let (tx, rx) = channel::<Kv>();
let db = self.engines.kv_engine.clone();
thread::spawn(move || -> Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please spawn threads with names.

let db = self.engines.kv_engine.clone();
thread::spawn(move || -> Result<()> {
for kv in &mut try!(gen_mvcc_iter(db.as_ref(), cf, &from, &to)) {
box_try!(tx.send(kv));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about if let Err(_) = tx.send() { return; }?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to call gen_mvcc_iter in the thread closure, which also returns a Result.
So let the closure return a Result is better?

mvcc_resp.set_info(mvcc);

let sink = self.0.take().unwrap();
match sink.send((mvcc_resp, WriteFlags::default())).wait() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attach an RpcContext to MvccKVToSink and spawn the sink.send(), so that we dont need to call wait().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because sink.send is called in other threads in pool and RpcContext can't Send, I think we can't do this.

Ok(mut sink) => {
let f = future::poll_fn(move || sink.close())
.map_err(|e| Service::on_grpc_error("scan_mvcc", e));
Box::new(f) as Box<Future<Item = (), Error = ()> + Send>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think spawning f here is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried but the RpcContext is not Send, so I can't call ctx.spawn(f) here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean call ctx.spwan() here, so f does not need to cast a BoxFuture .

let (tx, rx) = channel::<Kv>();
let db = self.engines.kv_engine.clone();
thread::Builder::new()
.name(format!("scan_mvcc_{}", cf))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use thd_name!().

@@ -151,6 +166,206 @@ impl Debugger {
Err(e) => Err(box_err!(e)),
}
}

pub fn scan_mvcc<D, E>(&self, req: ScanMvccRequest, dealer: &mut D) -> Result<()>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about passing a futures::sync::mpsc::UnboundedSender<MvccInfo>?

let (tx, rx) = unbounded();
self
    .pool
    .spwan(
        ok(self.debugger.clone())
            .and_then(|debugger| debugger.scan_mvcc(req, tx)
    )
    .forgot();
let f = sink.send_all(rx).and_then(|sink| poll_fn(sink.close()));
ctx.spawn(f);

#[test]
fn test_scan_mvcc() {
struct MvccCounter(usize);
impl MvccKVDealer for MvccCounter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a test in test_servics.rs .

@hicqu
Copy link
Contributor Author

hicqu commented Sep 27, 2017

@overvenus , Now we don't need useless MvccKVDealer trait. PTAL again, thanks.

@hicqu
Copy link
Contributor Author

hicqu commented Sep 27, 2017

@BusyJay , PTAL, thanks.

@siddontang
Copy link
Contributor

PTAL @BusyJay @overvenus

if limit != 0 && count >= limit {
break;
}
if count >= limit {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it.


let mut mvcc_infos: BTreeMap<Vec<u8>, (MvccInfo, bool, bool, bool)> = BTreeMap::new();
let (mut want_lock, mut want_default, mut want_write) = (true, true, true);
let (mut m_l, mut m_d, mut m_w) = (None, None, None);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a better name.

break;
}
if want_lock {
if let Ok(kv) = lock_rx.recv() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use select!() [1].

[1] https://doc.rust-lang.org/std/macro.select.html

}
want_write = false;
}
for key in mvcc_infos.keys().cloned().collect::<Vec<_>>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

want_write = false;
}
for key in mvcc_infos.keys().cloned().collect::<Vec<_>>() {
let mvcc_and_flags = mvcc_infos.remove(&key).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also OccupiedEntry::remove_entry() [1].

[1] https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.entry

}
m_w = Some(key);
} else {
m_w = Some(vec![0xffu8]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does '0xffu8' mean?

@hicqu
Copy link
Contributor Author

hicqu commented Sep 29, 2017

/run-all-test

@hicqu
Copy link
Contributor Author

hicqu commented Sep 29, 2017

@BusyJay , PTAL, thanks.

Copy link
Member

@overvenus overvenus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hicqu
Copy link
Contributor Author

hicqu commented Oct 5, 2017

@BusyJay PTAL thanks.

@siddontang
Copy link
Contributor

PTAL @BusyJay

/// Compact the cf[start..end) in the db **by manual**.
pub fn scan_mvcc(&self, mut req: ScanMvccRequest) -> Result<MvccInfoIterator> {
let from_key = req.take_from_key();
let to_key = Some(req.take_to_key()).into_iter().find(|k| !k.is_empty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use if else instead.

.map(|_| ())
})
.map_err(|e| Service::on_grpc_error("scan_mvcc", &e))
.wait();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because DBIterator is !Send so that we can't use it directly in spawn.

.wait();
result
};
self.pool.spawn_fn(future).forget();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use spawn instead of spawn_fn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use spawn needs a Future instead of Closure returns Future. But if the future contains any DBIterator, it can't be Send, so that it can't spawn.

@hicqu
Copy link
Contributor Author

hicqu commented Oct 11, 2017

@BusyJay , PTAL thanks.

@hicqu
Copy link
Contributor Author

hicqu commented Oct 11, 2017

@BusyJay , PTAL, thanks.

@BusyJay
Copy link
Member

BusyJay commented Oct 11, 2017

LGTM, though I think MvccInfoIterator can still be improved.

@BusyJay
Copy link
Member

BusyJay commented Oct 11, 2017

/run-integration-tests

@siddontang
Copy link
Contributor

PTAL @overvenus

Copy link
Member

@BusyJay BusyJay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest LGTM

mvcc_info.set_values(values);
},
Ordering::Greater => {}
_ => unreachable!(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a bug, it's reachable. So should return all those keys instead of panic.

@@ -199,6 +200,11 @@ pub fn origin_key(key: &[u8]) -> &[u8] {
&key[DATA_PREFIX_KEY.len()..]
}

/// The caller should ensure the key is a timestamped key.
pub fn truncate_ts(key: &[u8]) -> &[u8] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function should be put in types.rs. All function in this file has nothing related to MVCC.

@hicqu
Copy link
Contributor Author

hicqu commented Oct 12, 2017

@overvenus , PTAL again, thanks.

},
Ordering::Greater => {}
_ => {
let err_msg = format!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @disksing, is an error message or listing all the related kvs better help with debuging?

@overvenus
Copy link
Member

/run-all-test

@hicqu hicqu merged commit 464c161 into master Oct 12, 2017
@hicqu hicqu deleted the qupeng/debug-scan-mvcc branch October 12, 2017 05:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants