Skip to content

Commit

Permalink
Merge branch 'master' into Roundxxx_func
Browse files Browse the repository at this point in the history
  • Loading branch information
shizy818 committed Sep 23, 2018
2 parents 6b7c5c2 + eae3f9d commit 9a8117a
Show file tree
Hide file tree
Showing 44 changed files with 1,684 additions and 1,178 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 8 additions & 13 deletions components/test_coprocessor/fixture.rs
Expand Up @@ -16,12 +16,12 @@ use super::*;
use kvproto::kvrpcpb::Context;

use tikv::coprocessor::codec::Datum;
use tikv::coprocessor::{EndPointHost, EndPointTask, ReadPoolContext};
use tikv::coprocessor::{Endpoint, ReadPoolContext};
use tikv::server::readpool::{self, ReadPool};
use tikv::server::Config;
use tikv::storage::engine::{self, RocksEngine};
use tikv::storage::{Engine, ALL_CFS, TEMP_DIR};
use tikv::util::worker::{Builder as WorkerBuilder, FutureWorker, Worker};
use tikv::util::worker::FutureWorker;

/// An example table for test purpose.
#[derive(Clone)]
Expand Down Expand Up @@ -68,7 +68,7 @@ pub fn init_data_with_engine_and_commit<E: Engine>(
tbl: &ProductTable,
vals: &[(i64, Option<&str>, i64)],
commit: bool,
) -> (Store<E>, Worker<EndPointTask<E>>) {
) -> (Store<E>, Endpoint<E>) {
init_data_with_details(
ctx,
engine,
Expand All @@ -88,7 +88,7 @@ pub fn init_data_with_details<E: Engine>(
commit: bool,
cfg: &Config,
read_pool_cfg: &readpool::Config,
) -> (Store<E>, Worker<EndPointTask<E>>) {
) -> (Store<E>, Endpoint<E>) {
let mut store = Store::new(engine);

store.begin();
Expand All @@ -107,20 +107,15 @@ pub fn init_data_with_details<E: Engine>(
let pool = ReadPool::new("readpool", read_pool_cfg, || {
|| ReadPoolContext::new(pd_worker.scheduler())
});
let mut end_point = WorkerBuilder::new("test-select-worker")
.batch_size(5)
.create();
let runner = EndPointHost::new(store.get_engine(), end_point.scheduler(), cfg, pool);
end_point.start(runner).unwrap();

(store, end_point)
let cop = Endpoint::new(cfg, store.get_engine(), pool);
(store, cop)
}

pub fn init_data_with_commit(
tbl: &ProductTable,
vals: &[(i64, Option<&str>, i64)],
commit: bool,
) -> (Store<RocksEngine>, Worker<EndPointTask<RocksEngine>>) {
) -> (Store<RocksEngine>, Endpoint<RocksEngine>) {
let engine = engine::new_local_engine(TEMP_DIR, ALL_CFS).unwrap();
init_data_with_engine_and_commit(Context::new(), engine, tbl, vals, commit)
}
Expand All @@ -129,6 +124,6 @@ pub fn init_data_with_commit(
pub fn init_with_data(
tbl: &ProductTable,
vals: &[(i64, Option<&str>, i64)],
) -> (Store<RocksEngine>, Worker<EndPointTask<RocksEngine>>) {
) -> (Store<RocksEngine>, Endpoint<RocksEngine>) {
init_data_with_commit(tbl, vals, true)
}
39 changes: 9 additions & 30 deletions components/test_coprocessor/util.rs
Expand Up @@ -13,74 +13,53 @@

use std::sync::atomic::{AtomicUsize, Ordering};

use futures::sync::{mpsc, oneshot};
use futures::{Future, Stream};
use protobuf::Message;

use kvproto::coprocessor::{Request, Response};
use tipb::schema::ColumnInfo;
use tipb::select::{SelectResponse, StreamResponse};

use tikv::coprocessor::{EndPointTask, RequestTask};
use tikv::server::OnResponse;
use tikv::coprocessor::Endpoint;
use tikv::storage::Engine;
use tikv::util::worker::Scheduler;

static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);

pub fn next_id() -> i64 {
ID_GENERATOR.fetch_add(1, Ordering::Relaxed) as i64
}

pub fn handle_request<E>(end_point_scheduler: &Scheduler<EndPointTask<E>>, req: Request) -> Response
pub fn handle_request<E>(cop: &Endpoint<E>, req: Request) -> Response
where
E: Engine,
{
let (tx, rx) = oneshot::channel();
let on_resp = OnResponse::Unary(tx);
let req = RequestTask::new(String::from("127.0.0.1"), req, on_resp, 100).unwrap();
end_point_scheduler
.schedule(EndPointTask::Request(req))
.unwrap();
rx.wait().unwrap()
cop.parse_and_handle_unary_request(req, None)
.wait()
.unwrap()
}

pub fn handle_select<E>(
end_point_scheduler: &Scheduler<EndPointTask<E>>,
req: Request,
) -> SelectResponse
pub fn handle_select<E>(cop: &Endpoint<E>, req: Request) -> SelectResponse
where
E: Engine,
{
let resp = handle_request(end_point_scheduler, req);
let resp = handle_request(cop, req);
assert!(!resp.get_data().is_empty(), "{:?}", resp);
let mut sel_resp = SelectResponse::new();
sel_resp.merge_from_bytes(resp.get_data()).unwrap();
sel_resp
}

pub fn handle_streaming_select<E, F>(
end_point_scheduler: &Scheduler<EndPointTask<E>>,
cop: &Endpoint<E>,
req: Request,
mut check_range: F,
) -> Vec<StreamResponse>
where
E: Engine,
F: FnMut(&Response) + Send + 'static,
{
let (stream_tx, stream_rx) = mpsc::channel(10);
let req = RequestTask::new(
String::from("127.0.0.1"),
req,
OnResponse::Streaming(stream_tx),
100,
).unwrap();
end_point_scheduler
.schedule(EndPointTask::Request(req))
.unwrap();
stream_rx
cop.parse_and_handle_stream_request(req, None)
.wait()
.into_iter()
.map(|resp| {
let resp = resp.unwrap();
check_range(&resp);
Expand Down
3 changes: 2 additions & 1 deletion components/test_raftstore/server.rs
Expand Up @@ -163,13 +163,14 @@ impl Simulator for ServerCluster {
let cop_read_pool = ReadPool::new("cop", &cfg.readpool.coprocessor.build_config(), || {
|| coprocessor::ReadPoolContext::new(pd_worker.scheduler())
});
let cop = coprocessor::Endpoint::new(&server_cfg, store.get_engine(), cop_read_pool);
let mut server = None;
for _ in 0..100 {
server = Some(Server::new(
&server_cfg,
&security_mgr,
store.clone(),
cop_read_pool.clone(),
cop.clone(),
sim_router.clone(),
resolver.clone(),
snap_mgr.clone(),
Expand Down
48 changes: 48 additions & 0 deletions docs/tools/pd-recover.md
@@ -0,0 +1,48 @@
---
title: PD Recover User Guide
summary: Use PD Recover to recover a PD cluster which cannot start or provide services normally.
category: tools
---

# PD Recover User Guide

PD Recover is a disaster recovery tool of PD, used to recover the PD cluster which cannot start or provide services normally.

## Source code compiling

1. [Go](https://golang.org/) Version 1.9 or later
2. In the root directory of the [PD project](https://github.com/pingcap/pd), use the `make` command to compile and generate `bin/pd-recover`

## Usage

This section describes how to recover a PD cluster which cannot start or provide services normally.

### Flags description

```
-alloc-id uint
Specify a number larger than the allocated ID of the original cluster
-cacert string
Specify the path to the trusted CA certificate file in PEM format
-cert string
Specify the path to the SSL certificate file in PEM format
-key string
Specify the path to the SSL certificate key file in PEM format, which is the private key of the certificate specified by `--cert`
-cluster-id uint
Specify the Cluster ID of the original cluster
-endpoints string
Specify the PD address (default: "http://127.0.0.1:2379")
```

### Recovery flow

1. Obtain the Cluster ID and the Alloc ID from the current cluster.

- Obtain the Cluster ID from the PD and TiKV logs.
- Obtain the allocated Alloc ID from either the PD log or the `Metadata Information` in the PD monitoring panel.

Specifying `alloc-id` requires a number larger than the current largest Alloc ID. If you fail to obtain the Alloc ID, you can make an estimate of a larger number according to the number of Regions and stores in the cluster. Generally, you can specify a number that is several orders of magnitude larger.

2. Stop the whole cluster, clear the PD data directory, and restart the PD cluster.
3. Use PD Recover to recover and make sure that you use the correct `cluster-id` and appropriate `alloc-id`.
4. When the recovery success information is prompted, restart the whole cluster.
16 changes: 14 additions & 2 deletions src/bin/tikv-ctl.rs
Expand Up @@ -1798,8 +1798,7 @@ fn get_module_type(module: &str) -> MODULE {
}

fn from_hex(key: &str) -> Result<Vec<u8>, FromHexError> {
const HEX_PREFIX: &str = "0x";
if key.starts_with(HEX_PREFIX) {
if key.starts_with("0x") || key.starts_with("0X") {
return key[2..].from_hex();
}
key.from_hex()
Expand Down Expand Up @@ -1982,3 +1981,16 @@ fn read_fail_file(path: &str) -> Vec<(String, String)> {
}
list
}

#[cfg(test)]
mod tests {
use super::from_hex;

#[test]
fn test_from_hex() {
let result = vec![0x74];
assert_eq!(from_hex("74").unwrap(), result);
assert_eq!(from_hex("0x74").unwrap(), result);
assert_eq!(from_hex("0X74").unwrap(), result);
}
}
1 change: 0 additions & 1 deletion src/bin/tikv-importer.rs
Expand Up @@ -12,7 +12,6 @@
// limitations under the License.

#![feature(slice_patterns)]
#![feature(use_extern_macros)]
#![feature(proc_macro_non_items)]

extern crate chrono;
Expand Down
4 changes: 2 additions & 2 deletions src/bin/tikv-server.rs
Expand Up @@ -12,7 +12,6 @@
// limitations under the License.

#![feature(slice_patterns)]
#![feature(use_extern_macros)]
#![feature(proc_macro_non_items)]

extern crate chrono;
Expand Down Expand Up @@ -196,11 +195,12 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
let pd_sender = pd_sender.clone();
move || coprocessor::ReadPoolContext::new(pd_sender.clone())
});
let cop = coprocessor::Endpoint::new(&server_cfg, storage.get_engine(), cop_read_pool);
let mut server = Server::new(
&server_cfg,
&security_mgr,
storage.clone(),
cop_read_pool,
cop,
raft_router,
resolver,
snap_mgr.clone(),
Expand Down

0 comments on commit 9a8117a

Please sign in to comment.