Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coprocessor: port dag executor to endpoint #1975

Merged
merged 32 commits into from Jul 17, 2017
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7932a7f
port dag executor to endpoint
Jun 29, 2017
212d2c6
Merge branch 'master' into lishihai/dag-exec-porting
BusyJon Jun 30, 2017
b343cf1
fix ci
Jun 30, 2017
f58a276
address comment
Jun 30, 2017
56a8915
Merge branch 'master' into lishihai/dag-exec-porting
Jul 2, 2017
4b30662
add checkout outdated
Jul 2, 2017
f5a440e
fix partial ci error, waitting for aggregation
Jul 4, 2017
e450ccb
Merge branch 'master' into lishihai/dag-exec-porting
BusyJon Jul 4, 2017
fb4fd13
sync the behavior with dag mode when scan index which has no affect w…
Jul 4, 2017
dde0cea
Merge branch 'lishihai/dag-exec-porting' of github.com:pingcap/tikv i…
Jul 4, 2017
ce3a8e9
fix all ci
Jul 5, 2017
feec170
Merge branch 'master' into lishihai/dag-exec-porting
BusyJon Jul 5, 2017
2dc3c7c
Merge branch 'master' into lishihai/dag-exec-porting
BusyJon Jul 5, 2017
5095c6d
minor fix
Jul 5, 2017
a18647e
Merge branch 'master' into lishihai/dag-exec-porting
BusyJon Jul 5, 2017
2ceefc7
Merge branch 'master' into lishihai/dag-exec-porting
Jul 5, 2017
d9082f6
Merge branch 'lishihai/dag-exec-porting' of github.com:pingcap/tikv i…
Jul 5, 2017
90457fd
Merge branch 'master' into lishihai/dag-exec-porting
Jul 5, 2017
991bf8a
minor fix
Jul 8, 2017
4261403
Merge branch 'master' into lishihai/dag-exec-porting
AndreMouche Jul 9, 2017
b24f43d
address comment
Jul 10, 2017
b62d503
Merge branch 'lishihai/dag-exec-porting' of github.com:pingcap/tikv i…
Jul 10, 2017
ed11c92
address comment
Jul 10, 2017
c7060cc
Merge branch 'master' into lishihai/dag-exec-porting
BusyJon Jul 10, 2017
5160304
Merge branch 'master' into lishihai/dag-exec-porting
Jul 17, 2017
18a2934
Merge branch 'lishihai/dag-exec-porting' of github.com:pingcap/tikv i…
Jul 17, 2017
121bf6f
Merge branch 'master' into lishihai/dag-exec-porting
BusyJon Jul 17, 2017
fd9d96e
fix ci error
Jul 17, 2017
304b306
Merge branch 'lishihai/dag-exec-porting' of github.com:pingcap/tikv i…
Jul 17, 2017
c335ed5
fix ci
Jul 17, 2017
12b3fd0
Merge branch 'master' into lishihai/dag-exec-porting
BusyJon Jul 17, 2017
ef8a544
Merge branch 'master' into lishihai/dag-exec-porting
BusyJon Jul 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
147 changes: 147 additions & 0 deletions src/coprocessor/dag.rs
@@ -0,0 +1,147 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

use std::rc::Rc;
use std::time::Instant;

use tipb::executor::{Executor, ExecType};
use tipb::schema::ColumnInfo;
use tipb::select::{DAGRequest, Chunk};
use kvproto::coprocessor::KeyRange;
use kvproto::kvrpcpb::IsolationLevel;

use storage::{Snapshot, Statistics};
use util::xeval::EvalContext;
use super::{Result, Error};
use super::executor::Executor as DAGExecutor;
use super::executor::table_scan::TableScanExecutor;
use super::executor::index_scan::IndexScanExecutor;
use super::executor::selection::SelectionExecutor;
use super::executor::aggregation::AggregationExecutor;
use super::executor::topn::TopNExecutor;
use super::executor::limit::LimitExecutor;

pub struct DAGContext<'s> {
req: DAGRequest,
pub deadline: Instant,
pub columns: Vec<ColumnInfo>,
ranges: Vec<KeyRange>,
snap: &'s Snapshot,
pub has_aggr: bool,
eval_ctx: Rc<EvalContext>,
pub chunks: Vec<Chunk>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reorder the members to make the public members together.

}

impl<'s> DAGContext<'s> {
pub fn new(req: DAGRequest,
deadline: Instant,
ranges: Vec<KeyRange>,
snap: &'s Snapshot,
eval_ctx: Rc<EvalContext>)
-> DAGContext<'s> {
DAGContext {
req: req,
deadline: deadline,
columns: vec![],
ranges: ranges,
snap: snap,
has_aggr: false,
eval_ctx: eval_ctx,
chunks: vec![],
}
}

pub fn validate_dag(&mut self) -> Result<()> {
let execs = self.req.get_executors();
let first = try!(execs.first()
.ok_or_else(|| Error::Other(box_err!("dag copr has not executor"))));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has no executor

// check whether first exec is *scan and get the column info
match first.get_tp() {
ExecType::TypeTableScan => {
self.columns = first.get_tbl_scan().get_columns().to_vec();
}
ExecType::TypeIndexScan => {
self.columns = first.get_idx_scan().get_columns().to_vec();
}
_ => {
return Err(box_err!("first exec type should be *Scan, but get {:?}",
first.get_tp()))
}
}
// check whether dag has a aggregation action and take a flag
if execs.iter().rev().any(|ref exec| exec.get_tp() == ExecType::TypeAggregation) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needless borrow for exec

self.has_aggr = true;
}
Ok(())
}

// FIXME seperate first exec build action from `build_dag`
// since it will generte some mutable conflict when putting together
pub fn build_first(&'s self,
mut first: Executor,
statistics: &'s mut Statistics)
-> Box<DAGExecutor + 's> {
match first.get_tp() {
ExecType::TypeTableScan => {
Box::new(TableScanExecutor::new(first.take_tbl_scan(),
self.ranges.clone(),
self.snap,
statistics,
self.req.get_start_ts(),
IsolationLevel::SI))
}
ExecType::TypeIndexScan => {
Box::new(IndexScanExecutor::new(first.take_idx_scan(),
self.ranges.clone(),
self.snap,
statistics,
self.req.get_start_ts(),
IsolationLevel::SI))
}
_ => unreachable!(),
}
}

pub fn build_dag(&'s self, statistics: &'s mut Statistics) -> Result<Box<DAGExecutor + 's>> {
let mut execs = self.req.get_executors().to_vec().into_iter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to avoid copying here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use reference here will generate a mut borrow conflict in the env which used this func.

let mut src = self.build_first(execs.next().unwrap(), statistics);
for mut exec in execs {
let curr: Box<DAGExecutor> = match exec.get_tp() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if set src directly here?

ExecType::TypeTableScan | ExecType::TypeIndexScan => {
return Err(box_err!("got too much *scan exec, should be only one"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tablescan and indexscan should only be in the head of the queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's guaranteed by upper layer

}
ExecType::TypeSelection => {
Box::new(try!(SelectionExecutor::new(exec.take_selection(),
self.eval_ctx.clone(),
&self.columns,
src)))
}
ExecType::TypeAggregation => {
Box::new(try!(AggregationExecutor::new(exec.take_aggregation(),
self.eval_ctx.clone(),
&self.columns,
src)))
}
ExecType::TypeTopN => {
Box::new(try!(TopNExecutor::new(exec.take_topN(),
self.eval_ctx.clone(),
&self.columns,
src)))
}
ExecType::TypeLimit => Box::new(LimitExecutor::new(exec.take_limit(), src)),
};
src = curr;
}
Ok(src)
}
}