Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Vectorized Evaluation framework #4322

Merged
merged 7 commits into from Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
304 changes: 299 additions & 5 deletions src/coprocessor/dag/rpn_expr/function.rs
Expand Up @@ -11,6 +11,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// TODO
#![allow(dead_code)]

use super::types::RpnFnCallPayload;
use crate::coprocessor::codec::data_type::{Evaluable, ScalarValue, VectorValue};
use crate::coprocessor::dag::expr::EvalContext;
use crate::coprocessor::Result;

/// A trait for all RPN functions.
pub trait RpnFunction: std::fmt::Debug + Send + Sync + 'static {
/// The display name of the function.
Expand All @@ -20,6 +28,15 @@ pub trait RpnFunction: std::fmt::Debug + Send + Sync + 'static {
///
/// Currently we do not support variable arguments.
fn args_len(&self) -> usize;

/// Evaluates the function according to given raw arguments. A raw argument contains the
/// argument value and the argument field type.
fn eval(
&self,
rows: usize,
context: &mut EvalContext,
payload: RpnFnCallPayload<'_>,
) -> Result<VectorValue>;
}

impl<T: RpnFunction + ?Sized> RpnFunction for Box<T> {
Expand All @@ -32,6 +49,267 @@ impl<T: RpnFunction + ?Sized> RpnFunction for Box<T> {
fn args_len(&self) -> usize {
(**self).args_len()
}

#[inline]
fn eval(
&self,
rows: usize,
context: &mut EvalContext,
payload: RpnFnCallPayload<'_>,
) -> Result<VectorValue> {
(**self).eval(rows, context, payload)
}
}

pub struct Helper;

impl Helper {
/// Evaluates a function without argument to produce a vector value.
///
/// The function will be called multiple times to fill the vector.
#[inline(always)]
pub fn eval_0_arg<Ret, F>(
rows: usize,
mut f: F,
context: &mut EvalContext,
payload: RpnFnCallPayload<'_>,
) -> Result<VectorValue>
where
Ret: Evaluable,
F: FnMut(&mut EvalContext, RpnFnCallPayload<'_>) -> Result<Ret>,
{
assert_eq!(payload.args_len(), 0);

let mut result = Vec::with_capacity(rows);
for _ in 0..rows {
result.push(f(context, payload)?);
}
Ok(Ret::into_vector_value(result))
}

/// Evaluates a function with 1 scalar or vector argument to produce a vector value.
///
/// The function will be called multiple times to fill the vector.
#[inline(always)]
pub fn eval_1_arg<Arg0, Ret, F>(
rows: usize,
mut f: F,
context: &mut EvalContext,
payload: RpnFnCallPayload<'_>,
) -> Result<VectorValue>
where
Arg0: Evaluable,
Ret: Evaluable,
F: FnMut(&mut EvalContext, RpnFnCallPayload<'_>, &Arg0) -> Result<Ret>,
{
assert_eq!(payload.args_len(), 1);

let mut result = Vec::with_capacity(rows);
if payload.raw_arg_at(0).is_scalar() {
let v = Arg0::borrow_scalar_value(payload.raw_arg_at(0).scalar_value().unwrap());
for _ in 0..rows {
result.push(f(context, payload, v)?);
}
} else {
let v = Arg0::borrow_vector_value(payload.raw_arg_at(0).vector_value().unwrap());
assert_eq!(rows, v.len());
for i in 0..rows {
result.push(f(context, payload, &v[i])?);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for value in v {
    result.push(f(context, payload, value)?);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. I think this kind of style (including your comment in other unary / binary function) is not as easy to understand as current ones, because they use different syntax for functions serve as the same purpose? While if we access it using index, the syntax looks to be very unified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see what this syntax is unifying with, could you elaborate? The for loop with iterator (including .zip) is used quite a lot within TiKV, so I don't think it is really harder to understand.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The syntax now looks like:

unary function scalar value: for _ in ... { ... f(arg0) }
unary function vector value: for i in ... { ... f(arg0[i]) }
binary function scalar scalar value: for _ in ... { ... f(arg0, arg1) }
binary function scalar vector value: for i in ... { ... f(arg0, arg1[i]) }
binary function vector scalar value: for i in ... { ... f(arg0[i], arg1) }
binary function vector vector value: for i in ... { ... f(arg0[i], arg1[i]) }

which looks pretty much the same and is quite easy to understand the all by only looking at one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to not use for i in 0..rows
See https://rust-lang.github.io/rust-clippy/master/index.html#needless_range_loop

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we disabled this clippy rule for the reason that this rule does not always produce more readable code :)

}
Ok(Ret::into_vector_value(result))
}

/// Evaluates a function with 2 scalar or vector arguments to produce a vector value.
///
/// The function will be called multiple times to fill the vector.
#[inline(always)]
pub fn eval_2_args<Arg0, Arg1, Ret, F>(
rows: usize,
f: F,
context: &mut EvalContext,
payload: RpnFnCallPayload<'_>,
) -> Result<VectorValue>
where
Arg0: Evaluable,
Arg1: Evaluable,
Ret: Evaluable,
F: FnMut(&mut EvalContext, RpnFnCallPayload<'_>, &Arg0, &Arg1) -> Result<Ret>,
{
assert_eq!(payload.args_len(), 2);

if payload.raw_arg_at(0).is_scalar() {
breezewish marked this conversation as resolved.
Show resolved Hide resolved
if payload.raw_arg_at(1).is_scalar() {
Self::eval_2_args_scalar_scalar(
rows,
f,
context,
payload,
payload.raw_arg_at(0).scalar_value().unwrap(),
payload.raw_arg_at(1).scalar_value().unwrap(),
)
} else {
Self::eval_2_args_scalar_vector(
rows,
f,
context,
payload,
payload.raw_arg_at(0).scalar_value().unwrap(),
payload.raw_arg_at(1).vector_value().unwrap(),
)
}
} else {
if payload.raw_arg_at(1).is_scalar() {
Self::eval_2_args_vector_scalar(
rows,
f,
context,
payload,
payload.raw_arg_at(0).vector_value().unwrap(),
payload.raw_arg_at(1).scalar_value().unwrap(),
)
} else {
Self::eval_2_args_vector_vector(
rows,
f,
context,
payload,
payload.raw_arg_at(0).vector_value().unwrap(),
payload.raw_arg_at(1).vector_value().unwrap(),
)
}
}
}

#[inline(always)]
fn eval_2_args_scalar_scalar<Arg0, Arg1, Ret, F>(
rows: usize,
mut f: F,
context: &mut EvalContext,
payload: RpnFnCallPayload<'_>,
lhs: &ScalarValue,
rhs: &ScalarValue,
) -> Result<VectorValue>
where
Arg0: Evaluable,
Arg1: Evaluable,
Ret: Evaluable,
F: FnMut(&mut EvalContext, RpnFnCallPayload<'_>, &Arg0, &Arg1) -> Result<Ret>,
{
let mut result = Vec::with_capacity(rows);
let lhs = Arg0::borrow_scalar_value(lhs);
let rhs = Arg1::borrow_scalar_value(rhs);
for _ in 0..rows {
result.push(f(context, payload, lhs, rhs)?);
}
Ok(Ret::into_vector_value(result))
}

#[inline(always)]
fn eval_2_args_scalar_vector<Arg0, Arg1, Ret, F>(
rows: usize,
mut f: F,
context: &mut EvalContext,
payload: RpnFnCallPayload<'_>,
lhs: &ScalarValue,
rhs: &VectorValue,
) -> Result<VectorValue>
where
Arg0: Evaluable,
Arg1: Evaluable,
Ret: Evaluable,
F: FnMut(&mut EvalContext, RpnFnCallPayload<'_>, &Arg0, &Arg1) -> Result<Ret>,
{
assert_eq!(rows, rhs.len());
let mut result = Vec::with_capacity(rows);
let lhs = Arg0::borrow_scalar_value(lhs);
let rhs = Arg1::borrow_vector_value(rhs);
breezewish marked this conversation as resolved.
Show resolved Hide resolved
for i in 0..rows {
result.push(f(context, payload, lhs, &rhs[i])?);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(ditto)

Ok(Ret::into_vector_value(result))
}

#[inline(always)]
fn eval_2_args_vector_scalar<Arg0, Arg1, Ret, F>(
rows: usize,
mut f: F,
context: &mut EvalContext,
payload: RpnFnCallPayload<'_>,
lhs: &VectorValue,
rhs: &ScalarValue,
) -> Result<VectorValue>
where
Arg0: Evaluable,
Arg1: Evaluable,
Ret: Evaluable,
F: FnMut(&mut EvalContext, RpnFnCallPayload<'_>, &Arg0, &Arg1) -> Result<Ret>,
{
assert_eq!(rows, lhs.len());
let mut result = Vec::with_capacity(rows);
let lhs = Arg0::borrow_vector_value(lhs);
let rhs = Arg1::borrow_scalar_value(rhs);
for i in 0..rows {
result.push(f(context, payload, &lhs[i], rhs)?);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(ditto)

Ok(Ret::into_vector_value(result))
}

#[inline(always)]
fn eval_2_args_vector_vector<Arg0, Arg1, Ret, F>(
rows: usize,
mut f: F,
context: &mut EvalContext,
payload: RpnFnCallPayload<'_>,
lhs: &VectorValue,
rhs: &VectorValue,
) -> Result<VectorValue>
where
Arg0: Evaluable,
Arg1: Evaluable,
Ret: Evaluable,
F: FnMut(&mut EvalContext, RpnFnCallPayload<'_>, &Arg0, &Arg1) -> Result<Ret>,
{
assert_eq!(rows, lhs.len());
assert_eq!(rows, rhs.len());
let mut result = Vec::with_capacity(rows);
let lhs = Arg0::borrow_vector_value(lhs);
let rhs = Arg1::borrow_vector_value(rhs);
for i in 0..rows {
result.push(f(context, payload, &lhs[i], &rhs[i])?);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (left, right) in lhs.iter().zip(rhs) {

Ok(Ret::into_vector_value(result))
}

/// Evaluates a function with 3 scalar or vector arguments to produce a vector value.
///
/// The function will be called multiple times to fill the vector. For each function call,
/// there will be one indirection to support both scalar and vector arguments.
#[inline(always)]
pub fn eval_3_args<Arg0, Arg1, Arg2, Ret, F>(
rows: usize,
mut f: F,
context: &mut EvalContext,
payload: RpnFnCallPayload<'_>,
) -> Result<VectorValue>
where
Arg0: Evaluable,
Arg1: Evaluable,
Arg2: Evaluable,
Ret: Evaluable,
F: FnMut(&mut EvalContext, RpnFnCallPayload<'_>, &Arg0, &Arg1, &Arg2) -> Result<Ret>,
{
assert_eq!(payload.args_len(), 3);

let mut result = Vec::with_capacity(rows);
let arg0 = Arg0::borrow_vector_like_specialized(payload.raw_arg_at(0).as_vector_like());
let arg1 = Arg1::borrow_vector_like_specialized(payload.raw_arg_at(1).as_vector_like());
let arg2 = Arg2::borrow_vector_like_specialized(payload.raw_arg_at(2).as_vector_like());
for i in 0..rows {
result.push(f(context, payload, &arg0[i], &arg1[i], &arg2[i])?);
}
Ok(Ret::into_vector_value(result))
}
}

/// Implements `RpnFunction` automatically for structure that accepts 0, 1, 2 or 3 arguments.
Expand All @@ -41,18 +319,18 @@ impl<T: RpnFunction + ?Sized> RpnFunction for Box<T> {
#[macro_export]
macro_rules! impl_template_fn {
(0 arg @ $name:ident) => {
impl_template_fn! { @inner $name, 0 }
impl_template_fn! { @inner $name, 0, eval_0_arg }
};
(1 arg @ $name:ident) => {
impl_template_fn! { @inner $name, 1 }
impl_template_fn! { @inner $name, 1, eval_1_arg }
};
(2 arg @ $name:ident) => {
impl_template_fn! { @inner $name, 2 }
impl_template_fn! { @inner $name, 2, eval_2_args }
};
(3 arg @ $name:ident) => {
impl_template_fn! { @inner $name, 3 }
impl_template_fn! { @inner $name, 3, eval_3_args }
};
(@inner $name:ident, $args:expr) => {
(@inner $name:ident, $args:expr, $eval_fn:ident) => {
impl $crate::coprocessor::dag::rpn_expr::RpnFunction for $name {
#[inline]
fn name(&self) -> &'static str {
Expand All @@ -63,6 +341,22 @@ macro_rules! impl_template_fn {
fn args_len(&self) -> usize {
$args
}

#[inline]
fn eval(
&self,
rows: usize,
context: &mut $crate::coprocessor::dag::expr::EvalContext,
payload: $crate::coprocessor::dag::rpn_expr::types::RpnFnCallPayload<'_>,
) -> $crate::coprocessor::Result<$crate::coprocessor::codec::data_type::VectorValue>
{
$crate::coprocessor::dag::rpn_expr::function::Helper::$eval_fn(
rows,
Self::call,
context,
payload,
)
}
}
};
}
2 changes: 1 addition & 1 deletion src/coprocessor/dag/rpn_expr/mod.rs
Expand Up @@ -16,7 +16,7 @@ mod function;
mod types;

pub use self::function::RpnFunction;
pub use self::types::RpnExpression;
pub use self::types::{RpnExpression, RpnExpressionBuilder};

use tipb::expression::ScalarFuncSig;

Expand Down