Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(expr): make evaluation async #8229

Merged
merged 10 commits into from Mar 15, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/expr/Cargo.toml
Expand Up @@ -24,6 +24,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "std"
chrono-tz = { version = "0.7", features = ["case-insensitive"] }
dyn-clone = "1"
either = "1"
futures-util = "0.3"
itertools = "0.10"
md5 = "0.7.0"
num-traits = "0.2"
Expand Down
8 changes: 5 additions & 3 deletions src/expr/src/expr/expr_in.rs
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;

use futures_util::future::FutureExt;
use risingwave_common::array::{ArrayBuilder, ArrayRef, BoolArrayBuilder, DataChunk};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum, Scalar, ToOwnedDatum};
Expand Down Expand Up @@ -112,9 +113,10 @@ impl<'a> TryFrom<&'a ExprNode> for InExpression {
let data_chunk = DataChunk::new_dummy(1);
for child in &children[1..] {
let const_expr = build_from_prost(child)?;
let array = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(const_expr.eval(&data_chunk))
})?;
let array = const_expr
.eval(&data_chunk)
.now_or_never()
.expect("constant expression should not be async")?;
let datum = array.value_at(0).to_owned_datum();
data.push(datum);
}
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/expr/mod.rs
Expand Up @@ -14,6 +14,7 @@

use enum_as_inner::EnumAsInner;
use fixedbitset::FixedBitSet;
use futures::FutureExt;
use paste::paste;
use risingwave_common::array::ListValue;
use risingwave_common::error::Result as RwResult;
Expand Down Expand Up @@ -240,7 +241,9 @@ impl ExprImpl {
/// Evaluate a constant expression.
pub fn eval_row_const(&self) -> RwResult<Datum> {
assert!(self.is_const());
tokio::runtime::Handle::current().block_on(self.eval_row(&OwnedRow::empty()))
self.eval_row(&OwnedRow::empty())
.now_or_never()
.expect("constant expression should not be async")
Comment on lines +255 to +256
Copy link
Member

@BugenZhao BugenZhao Mar 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to do this frequently in the future, how about making it an extension to the Expr trait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay. But I don't think this should be encouraged. It should only be used in constant evaluation. 🤔

}
}

Expand Down
11 changes: 4 additions & 7 deletions src/frontend/src/scheduler/local.rs
Expand Up @@ -17,7 +17,6 @@ use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
use futures::executor::block_on;
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
Expand All @@ -41,7 +40,6 @@ use risingwave_pb::batch_plan::{
};
use risingwave_pb::common::WorkerNode;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;
use uuid::Uuid;
Expand Down Expand Up @@ -138,11 +136,10 @@ impl LocalQueryExecution {
}
};

if cfg!(madsim) {
tokio::spawn(future);
} else {
spawn_blocking(move || block_on(future));
}
#[cfg(madsim)]
tokio::spawn(future);
#[cfg(not(madsim))]
tokio::task::spawn_blocking(move || futures::executor::block_on(future));

ReceiverStream::new(receiver)
}
Expand Down