From 0613eabdfa28ac18f39f3852bdca2c5ea8badc6d Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 12 Apr 2017 22:08:08 +0800 Subject: [PATCH 1/3] *: asynchronous pd client, take 4 (#1753) This is the fourth part of using an asynchronous PD client. It contains the following changes: - Refactor PdWorker as AsyncWorker. (Address https://github.com/pingcap/tikv/pull/1712#issuecomment-290102625) --- src/raftstore/store/peer.rs | 2 +- src/raftstore/store/store.rs | 6 +- src/raftstore/store/worker/pd.rs | 341 ++++++++++++++++++------------- 3 files changed, 200 insertions(+), 149 deletions(-) diff --git a/src/raftstore/store/peer.rs b/src/raftstore/store/peer.rs index bdf55a06c88d..5fac942b18b1 100644 --- a/src/raftstore/store/peer.rs +++ b/src/raftstore/store/peer.rs @@ -39,7 +39,7 @@ use raftstore::store::Config; use raftstore::store::worker::{apply, PdTask}; use raftstore::store::worker::apply::ExecResult; -use util::worker::{Worker, Scheduler}; +use util::worker::{FutureWorker as Worker, Scheduler}; use raftstore::store::worker::{ApplyTask, ApplyRes}; use util::{clocktime, Either, HashMap, HashSet, strftimespec}; diff --git a/src/raftstore/store/store.rs b/src/raftstore/store/store.rs index ef44e262acfa..1a67c8e94315 100644 --- a/src/raftstore/store/store.rs +++ b/src/raftstore/store/store.rs @@ -41,7 +41,7 @@ use protobuf::Message; use raft::{self, SnapshotStatus, INVALID_INDEX}; use raftstore::{Result, Error}; use kvproto::metapb; -use util::worker::{Worker, Scheduler}; +use util::worker::{Worker, Scheduler, FutureWorker}; use util::transport::SendCh; use util::{rocksdb, HashMap, HashSet, RingQueue}; use storage::{CF_DEFAULT, CF_LOCK, CF_WRITE}; @@ -114,7 +114,7 @@ pub struct Store { region_worker: Worker, raftlog_gc_worker: Worker, compact_worker: Worker, - pd_worker: Worker, + pd_worker: FutureWorker, consistency_check_worker: Worker, pub apply_worker: Worker, @@ -194,7 +194,7 @@ impl Store { region_worker: Worker::new("snapshot worker"), raftlog_gc_worker: Worker::new("raft gc worker"), compact_worker: Worker::new("compact worker"), - pd_worker: Worker::new("pd worker"), + pd_worker: FutureWorker::new("pd worker"), consistency_check_worker: Worker::new("consistency check worker"), apply_worker: Worker::new("apply worker"), apply_res_receiver: None, diff --git a/src/raftstore/store/worker/pd.rs b/src/raftstore/store/worker/pd.rs index 2dbb60502828..d5525b3269ae 100644 --- a/src/raftstore/store/worker/pd.rs +++ b/src/raftstore/store/worker/pd.rs @@ -16,6 +16,7 @@ use std::fmt::{self, Formatter, Display}; use uuid::Uuid; use futures::Future; +use tokio_core::reactor::Handle; use kvproto::metapb; use kvproto::eraftpb::ConfChangeType; @@ -23,7 +24,7 @@ use kvproto::raft_cmdpb::{RaftCmdRequest, AdminRequest, AdminCmdType}; use kvproto::raft_serverpb::RaftMessage; use kvproto::pdpb; -use util::worker::Runnable; +use util::worker::FutureRunnable as Runnable; use util::escape; use util::transport::SendCh; use pd::PdClient; @@ -97,50 +98,41 @@ impl Runner { } } - fn send_admin_request(&self, - mut region: metapb::Region, - peer: metapb::Peer, - request: AdminRequest) { - let region_id = region.get_id(); - let cmd_type = request.get_cmd_type(); - - let mut req = RaftCmdRequest::new(); - req.mut_header().set_region_id(region_id); - req.mut_header().set_region_epoch(region.take_region_epoch()); - req.mut_header().set_peer(peer); - req.mut_header().set_uuid(Uuid::new_v4().as_bytes().to_vec()); - - req.set_admin_request(request); - - if let Err(e) = self.ch.try_send(Msg::new_raft_cmd(req, Box::new(|_| {}))) { - error!("[region {}] send {:?} request err {:?}", - region_id, - cmd_type, - e); - } - } - - fn handle_ask_split(&self, region: metapb::Region, split_key: Vec, peer: metapb::Peer) { + fn handle_ask_split(&self, + handle: &Handle, + region: metapb::Region, + split_key: Vec, + peer: metapb::Peer) { PD_REQ_COUNTER_VEC.with_label_values(&["ask split", "all"]).inc(); - match self.pd_client.ask_split(region.clone()).wait() { - Ok(mut resp) => { - info!("[region {}] try to split with new region id {} for region {:?}", - region.get_id(), - resp.get_new_region_id(), - region); - PD_REQ_COUNTER_VEC.with_label_values(&["ask split", "success"]).inc(); - - let req = new_split_region_request(split_key, - resp.get_new_region_id(), - resp.take_new_peer_ids()); - self.send_admin_request(region, peer, req); - } - Err(e) => debug!("[region {}] failed to ask split: {:?}", region.get_id(), e), - } + let ch = self.ch.clone(); + let f = self.pd_client + .ask_split(region.clone()) + .then(move |resp| { + match resp { + Ok(mut resp) => { + info!("[region {}] try to split with new region id {} for region {:?}", + region.get_id(), + resp.get_new_region_id(), + region); + PD_REQ_COUNTER_VEC.with_label_values(&["ask split", "success"]).inc(); + + let req = new_split_region_request(split_key, + resp.get_new_region_id(), + resp.take_new_peer_ids()); + send_admin_request(ch, region, peer, req); + } + Err(e) => { + debug!("[region {}] failed to ask split: {:?}", region.get_id(), e); + } + } + Ok(()) + }); + handle.spawn(f) } fn handle_heartbeat(&self, + handle: &Handle, region: metapb::Region, peer: metapb::Peer, down_peers: Vec, @@ -148,144 +140,162 @@ impl Runner { written_bytes: u64) { PD_REQ_COUNTER_VEC.with_label_values(&["heartbeat", "all"]).inc(); + let ch = self.ch.clone(); + // Now we use put region protocol for heartbeat. - match self.pd_client + let f = self.pd_client .region_heartbeat(region.clone(), peer.clone(), down_peers, pending_peers, written_bytes) - .wait() { - Ok(mut resp) => { - PD_REQ_COUNTER_VEC.with_label_values(&["heartbeat", "success"]).inc(); - - if resp.has_change_peer() { - PD_HEARTBEAT_COUNTER_VEC.with_label_values(&["change peer"]).inc(); - - let mut change_peer = resp.take_change_peer(); - info!("[region {}] try to change peer {:?} {:?} for region {:?}", - region.get_id(), - change_peer.get_change_type(), - change_peer.get_peer(), - region); - let req = new_change_peer_request(change_peer.get_change_type().into(), - change_peer.take_peer()); - self.send_admin_request(region, peer, req); - } else if resp.has_transfer_leader() { - PD_HEARTBEAT_COUNTER_VEC.with_label_values(&["transfer leader"]).inc(); - - let mut transfer_leader = resp.take_transfer_leader(); - info!("[region {}] try to transfer leader from {:?} to {:?}", - region.get_id(), - peer, - transfer_leader.get_peer()); - let req = new_transfer_leader_request(transfer_leader.take_peer()); - self.send_admin_request(region, peer, req) + .then(move |resp| { + match resp { + Ok(mut resp) => { + PD_REQ_COUNTER_VEC.with_label_values(&["heartbeat", "success"]).inc(); + + if resp.has_change_peer() { + PD_HEARTBEAT_COUNTER_VEC.with_label_values(&["change peer"]).inc(); + + let mut change_peer = resp.take_change_peer(); + info!("[region {}] try to change peer {:?} {:?} for region {:?}", + region.get_id(), + change_peer.get_change_type(), + change_peer.get_peer(), + region); + let req = new_change_peer_request(change_peer.get_change_type().into(), + change_peer.take_peer()); + send_admin_request(ch, region, peer, req); + } else if resp.has_transfer_leader() { + PD_HEARTBEAT_COUNTER_VEC.with_label_values(&["transfer leader"]).inc(); + + let mut transfer_leader = resp.take_transfer_leader(); + info!("[region {}] try to transfer leader from {:?} to {:?}", + region.get_id(), + peer, + transfer_leader.get_peer()); + let req = new_transfer_leader_request(transfer_leader.take_peer()); + send_admin_request(ch, region, peer, req) + } + } + Err(e) => { + debug!("[region {}] failed to send heartbeat: {:?}", + region.get_id(), + e); + } } - } - Err(e) => { - debug!("[region {}] failed to send heartbeat: {:?}", - region.get_id(), - e) - } - } + Ok(()) + }); + handle.spawn(f); } - fn handle_store_heartbeat(&self, stats: pdpb::StoreStats) { - if let Err(e) = self.pd_client.store_heartbeat(stats).wait() { - error!("store heartbeat failed {:?}", e); - } + fn handle_store_heartbeat(&self, handle: &Handle, stats: pdpb::StoreStats) { + let f = self.pd_client + .store_heartbeat(stats) + .map_err(|e| { + error!("store heartbeat failed {:?}", e); + }); + handle.spawn(f); } - fn handle_report_split(&self, left: metapb::Region, right: metapb::Region) { + fn handle_report_split(&self, handle: &Handle, left: metapb::Region, right: metapb::Region) { PD_REQ_COUNTER_VEC.with_label_values(&["report split", "all"]).inc(); - if let Err(e) = self.pd_client.report_split(left, right).wait() { - error!("report split failed {:?}", e); - } - PD_REQ_COUNTER_VEC.with_label_values(&["report split", "success"]).inc(); - } - - // send a raft message to destroy the specified stale peer - fn send_destroy_peer_message(&self, - local_region: metapb::Region, - peer: metapb::Peer, - pd_region: metapb::Region) { - let mut message = RaftMessage::new(); - message.set_region_id(local_region.get_id()); - message.set_from_peer(peer.clone()); - message.set_to_peer(peer.clone()); - message.set_region_epoch(pd_region.get_region_epoch().clone()); - message.set_is_tombstone(true); - if let Err(e) = self.ch.try_send(Msg::RaftMessage(message)) { - error!("send gc peer request to region {} err {:?}", - local_region.get_id(), - e) - } + let f = self.pd_client + .report_split(left, right) + .then(move |resp| { + match resp { + Ok(_) => { + PD_REQ_COUNTER_VEC.with_label_values(&["report split", "success"]).inc(); + } + Err(e) => { + error!("report split failed {:?}", e); + } + } + Ok(()) + }); + handle.spawn(f); } - fn handle_validate_peer(&self, local_region: metapb::Region, peer: metapb::Peer) { + fn handle_validate_peer(&self, + handle: &Handle, + local_region: metapb::Region, + peer: metapb::Peer) { PD_REQ_COUNTER_VEC.with_label_values(&["get region", "all"]).inc(); - match self.pd_client.get_region_by_id(local_region.get_id()).wait() { - Ok(Some(pd_region)) => { - PD_REQ_COUNTER_VEC.with_label_values(&["get region", "success"]).inc(); - if is_epoch_stale(pd_region.get_region_epoch(), - local_region.get_region_epoch()) { - // The local region epoch is fresher than region epoch in PD - // This means the region info in PD is not updated to the latest even - // after max_leader_missing_duration. Something is wrong in the system. - // Just add a log here for this situation. - error!("[region {}] {} the local region epoch: {:?} is greater the region \ - epoch in PD: {:?}. Something is wrong!", - local_region.get_id(), - peer.get_id(), - local_region.get_region_epoch(), - pd_region.get_region_epoch()); - PD_VALIDATE_PEER_COUNTER_VEC.with_label_values(&["region epoch error"]).inc(); - return; - } - if pd_region.get_peers().into_iter().all(|p| p != &peer) { - // Peer is not a member of this region anymore. Probably it's removed out. - // Send it a raft massage to destroy it since it's obsolete. - info!("[region {}] {} is not a valid member of region {:?}. To be destroyed \ - soon.", + let ch = self.ch.clone(); + let f = self.pd_client.get_region_by_id(local_region.get_id()).then(move |resp| { + match resp { + Ok(Some(pd_region)) => { + PD_REQ_COUNTER_VEC.with_label_values(&["get region", "success"]).inc(); + if is_epoch_stale(pd_region.get_region_epoch(), + local_region.get_region_epoch()) { + // The local region epoch is fresher than region epoch in PD + // This means the region info in PD is not updated to the latest even + // after max_leader_missing_duration. Something is wrong in the system. + // Just add a log here for this situation. + error!("[region {}] {} the local region epoch: {:?} is greater the \ + region epoch in PD: {:?}. Something is wrong!", + local_region.get_id(), + peer.get_id(), + local_region.get_region_epoch(), + pd_region.get_region_epoch()); + PD_VALIDATE_PEER_COUNTER_VEC.with_label_values(&["region epoch error"]) + .inc(); + return Ok(()); + } + + if pd_region.get_peers().into_iter().all(|p| p != &peer) { + // Peer is not a member of this region anymore. Probably it's removed out. + // Send it a raft massage to destroy it since it's obsolete. + info!("[region {}] {} is not a valid member of region {:?}. To be \ + destroyed soon.", + local_region.get_id(), + peer.get_id(), + pd_region); + PD_VALIDATE_PEER_COUNTER_VEC.with_label_values(&["peer stale"]).inc(); + send_destroy_peer_message(ch, local_region, peer, pd_region); + return Ok(()); + } + info!("[region {}] {} is still valid in region {:?}", local_region.get_id(), peer.get_id(), pd_region); - PD_VALIDATE_PEER_COUNTER_VEC.with_label_values(&["peer stale"]).inc(); - self.send_destroy_peer_message(local_region, peer, pd_region); - return; + PD_VALIDATE_PEER_COUNTER_VEC.with_label_values(&["peer valid"]).inc(); + } + Ok(None) => { + // splitted region has not yet reported to pd. + // TODO: handle merge + } + Err(e) => { + error!("get region failed {:?}", e); } - info!("[region {}] {} is still valid in region {:?}", - local_region.get_id(), - peer.get_id(), - pd_region); - PD_VALIDATE_PEER_COUNTER_VEC.with_label_values(&["peer valid"]).inc(); - } - Ok(None) => { - // split region has not yet report to pd. - // TODO: handle merge } - Err(e) => error!("get region failed {:?}", e), - } + Ok(()) + }); + handle.spawn(f); } } impl Runnable for Runner { - fn run(&mut self, task: Task) { + fn run(&mut self, task: Task, handle: &Handle) { debug!("executing task {}", task); match task { Task::AskSplit { region, split_key, peer } => { - self.handle_ask_split(region, split_key, peer) + self.handle_ask_split(handle, region, split_key, peer) } Task::Heartbeat { region, peer, down_peers, pending_peers, written_bytes } => { - self.handle_heartbeat(region, peer, down_peers, pending_peers, written_bytes) + self.handle_heartbeat(handle, + region, + peer, + down_peers, + pending_peers, + written_bytes) } - Task::StoreHeartbeat { stats } => self.handle_store_heartbeat(stats), - Task::ReportSplit { left, right } => self.handle_report_split(left, right), - Task::ValidatePeer { region, peer } => self.handle_validate_peer(region, peer), + Task::StoreHeartbeat { stats } => self.handle_store_heartbeat(handle, stats), + Task::ReportSplit { left, right } => self.handle_report_split(handle, left, right), + Task::ValidatePeer { region, peer } => self.handle_validate_peer(handle, region, peer), }; } } @@ -316,3 +326,44 @@ fn new_transfer_leader_request(peer: metapb::Peer) -> AdminRequest { req.mut_transfer_leader().set_peer(peer); req } + +fn send_admin_request(ch: SendCh, + mut region: metapb::Region, + peer: metapb::Peer, + request: AdminRequest) { + let region_id = region.get_id(); + let cmd_type = request.get_cmd_type(); + + let mut req = RaftCmdRequest::new(); + req.mut_header().set_region_id(region_id); + req.mut_header().set_region_epoch(region.take_region_epoch()); + req.mut_header().set_peer(peer); + req.mut_header().set_uuid(Uuid::new_v4().as_bytes().to_vec()); + + req.set_admin_request(request); + + if let Err(e) = ch.try_send(Msg::new_raft_cmd(req, Box::new(|_| {}))) { + error!("[region {}] send {:?} request err {:?}", + region_id, + cmd_type, + e); + } +} + +// send a raft message to destroy the specified stale peer +fn send_destroy_peer_message(ch: SendCh, + local_region: metapb::Region, + peer: metapb::Peer, + pd_region: metapb::Region) { + let mut message = RaftMessage::new(); + message.set_region_id(local_region.get_id()); + message.set_from_peer(peer.clone()); + message.set_to_peer(peer.clone()); + message.set_region_epoch(pd_region.get_region_epoch().clone()); + message.set_is_tombstone(true); + if let Err(e) = ch.try_send(Msg::RaftMessage(message)) { + error!("send gc peer request to region {} err {:?}", + local_region.get_id(), + e) + } +} From 8fe59e16186f5776083c703babda791f17f3ee06 Mon Sep 17 00:00:00 2001 From: zhangjinpeng1987 Date: Thu, 13 Apr 2017 10:25:16 +0800 Subject: [PATCH 2/3] add test for gc raft log (#1737) --- src/raftstore/store/store.rs | 2 +- src/raftstore/store/worker/raftlog_gc.rs | 123 ++++++++++++++++++++++- 2 files changed, 120 insertions(+), 5 deletions(-) diff --git a/src/raftstore/store/store.rs b/src/raftstore/store/store.rs index 1a67c8e94315..e196d17fc003 100644 --- a/src/raftstore/store/store.rs +++ b/src/raftstore/store/store.rs @@ -410,7 +410,7 @@ impl Store { self.cfg.snap_apply_batch_size); box_try!(self.region_worker.start(runner)); - let raftlog_gc_runner = RaftlogGcRunner; + let raftlog_gc_runner = RaftlogGcRunner::new(None); box_try!(self.raftlog_gc_worker.start(raftlog_gc_runner)); let compact_runner = CompactRunner::new(self.engine.clone()); diff --git a/src/raftstore/store/worker/raftlog_gc.rs b/src/raftstore/store/worker/raftlog_gc.rs index d907b0ad4f65..9f46b2cce4bd 100644 --- a/src/raftstore/store/worker/raftlog_gc.rs +++ b/src/raftstore/store/worker/raftlog_gc.rs @@ -21,6 +21,7 @@ use rocksdb::{DB, WriteBatch, Writable}; use std::sync::Arc; use std::fmt::{self, Formatter, Display}; use std::error; +use std::sync::mpsc::Sender; pub struct Task { pub engine: Arc, @@ -29,6 +30,10 @@ pub struct Task { pub end_idx: u64, } +pub struct TaskRes { + pub collected: u64, +} + impl Display for Task { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, @@ -51,9 +56,15 @@ quick_error! { } } -pub struct Runner; +pub struct Runner { + ch: Option>, +} impl Runner { + pub fn new(ch: Option>) -> Runner { + Runner { ch: ch } + } + /// Do the gc job and return the count of log collected. fn gc_raft_log(&mut self, engine: Arc, @@ -79,10 +90,17 @@ impl Runner { let key = keys::raft_log_key(region_id, idx); box_try!(wb.delete_cf(handle, &key)); } - // It's not safe to disable WAL here. We may lost data after crashed for unknown reason. + // TODO: disable WAL here. engine.write(wb).unwrap(); Ok(end_idx - first_idx) } + + fn report_collected(&self, collected: u64) { + if self.ch.is_none() { + return; + } + self.ch.as_ref().unwrap().send(TaskRes { collected: collected }).unwrap(); + } } impl Runnable for Runner { @@ -91,8 +109,105 @@ impl Runnable for Runner { task.region_id, task.end_idx); match self.gc_raft_log(task.engine, task.region_id, task.start_idx, task.end_idx) { - Err(e) => error!("[region {}] failed to gc: {:?}", task.region_id, e), - Ok(n) => info!("[region {}] collected {} log entries", task.region_id, n), + Err(e) => { + error!("[region {}] failed to gc: {:?}", task.region_id, e); + self.report_collected(0); + } + Ok(n) => { + info!("[region {}] collected {} log entries", task.region_id, n); + self.report_collected(n); + } + } + } +} + +#[cfg(test)] +mod test { + use std::sync::mpsc; + use std::time::Duration; + use util::rocksdb::new_engine; + use tempdir::TempDir; + use storage::{CF_DEFAULT, CF_RAFT}; + use super::*; + + #[test] + fn test_gc_raft_log() { + let path = TempDir::new("gc-raft-log-test").unwrap(); + let db = new_engine(path.path().to_str().unwrap(), &[CF_DEFAULT, CF_RAFT]).unwrap(); + let db = Arc::new(db); + + let (tx, rx) = mpsc::channel(); + let mut runner = Runner::new(Some(tx)); + + // generate raft logs + let raft_handle = rocksdb::get_cf_handle(&db, CF_RAFT).unwrap(); + let region_id = 1; + let wb = WriteBatch::new(); + for i in 0..100 { + let k = keys::raft_log_key(region_id, i); + wb.put_cf(raft_handle, &k, b"entry").unwrap(); + } + db.write(wb).unwrap(); + + let tbls = vec![(Task { + engine: db.clone(), + region_id: region_id, + start_idx: 0, + end_idx: 10, + }, + 10, + (0, 10), + (10, 100)), + (Task { + engine: db.clone(), + region_id: region_id, + start_idx: 0, + end_idx: 50, + }, + 40, + (0, 50), + (50, 100)), + (Task { + engine: db.clone(), + region_id: region_id, + start_idx: 50, + end_idx: 50, + }, + 0, + (0, 50), + (50, 100)), + (Task { + engine: db.clone(), + region_id: region_id, + start_idx: 50, + end_idx: 60, + }, + 10, + (0, 60), + (60, 100))]; + + for (task, expected_collectd, not_exist_range, exist_range) in tbls { + runner.run(task); + let res = rx.recv_timeout(Duration::from_secs(3)).unwrap(); + assert_eq!(res.collected, expected_collectd); + raft_log_must_not_exist(&db, 1, not_exist_range.0, not_exist_range.1); + raft_log_must_exist(&db, 1, exist_range.0, exist_range.1); + } + } + + fn raft_log_must_not_exist(engine: &DB, region_id: u64, start_idx: u64, end_idx: u64) { + let raft_handle = rocksdb::get_cf_handle(engine, CF_RAFT).unwrap(); + for i in start_idx..end_idx { + let k = keys::raft_log_key(region_id, i); + assert!(engine.get_cf(raft_handle, &k).unwrap().is_none()); + } + } + + fn raft_log_must_exist(engine: &DB, region_id: u64, start_idx: u64, end_idx: u64) { + let raft_handle = rocksdb::get_cf_handle(engine, CF_RAFT).unwrap(); + for i in start_idx..end_idx { + let k = keys::raft_log_key(region_id, i); + assert!(engine.get_cf(raft_handle, &k).unwrap().is_some()); } } } From 885e8b0131255f04e30a79231a5d0182ca4d2662 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Thu, 13 Apr 2017 12:59:22 +0800 Subject: [PATCH 3/3] travis: limit compilation concurrency --- Makefile | 4 ++-- travis-build/test.sh | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 87c8b1086867..90481b08db73 100644 --- a/Makefile +++ b/Makefile @@ -54,8 +54,8 @@ test: export DYLD_LIBRARY_PATH="${DYLD_LIBRARY_PATH}:${LOCAL_DIR}/lib" && \ export LOG_LEVEL=DEBUG && \ export RUST_BACKTRACE=1 && \ - cargo test --features "${ENABLE_FEATURES}" ${NO_RUN} -- --nocapture && \ - cargo test --features "${ENABLE_FEATURES}" --bench benches ${NO_RUN} -- --nocapture + cargo test --features "${ENABLE_FEATURES}" ${EXTRA_CARGO_ARGS} -- --nocapture && \ + cargo test --features "${ENABLE_FEATURES}" --bench benches ${EXTRA_CARGO_ARGS} -- --nocapture # TODO: remove above target once https://github.com/rust-lang/cargo/issues/2984 is resolved. bench: diff --git a/travis-build/test.sh b/travis-build/test.sh index d19e33cfafc5..5567ff6e5d92 100755 --- a/travis-build/test.sh +++ b/travis-build/test.sh @@ -35,10 +35,15 @@ if [[ "$TRAVIS" = "true" ]]; then fi export RUSTFLAGS=-Dwarnings +if [[ `uname` == "Linux" ]]; then + export EXTRA_CARGO_ARGS="-j 2" +fi + if [[ "$SKIP_TESTS" != "true" ]]; then make test 2>&1 | tee tests.out else - NO_RUN="--no-run" make test + export EXTRA_CARGO_ARGS="$EXTRA_CARGO_ARGS --no-run" + make test exit $? fi status=$?