From 1809a4970c48cc214217a013d9e3ff7408c9b8a5 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Tue, 29 Aug 2023 22:32:00 +0800 Subject: [PATCH 1/4] feat(streaming): flow control executor (#11919) Co-authored-by: Eric Fu --- Cargo.lock | 30 ++++++ e2e_test/streaming/rate_limit.slt | 26 +++++ proto/stream_plan.proto | 3 + src/common/src/session_config/mod.rs | 23 ++++- .../optimizer/plan_node/stream_table_scan.rs | 7 ++ src/stream/Cargo.toml | 1 + src/stream/src/executor/flow_control.rs | 98 +++++++++++++++++++ src/stream/src/executor/mod.rs | 2 + src/stream/src/from_proto/chain.rs | 10 +- 9 files changed, 197 insertions(+), 3 deletions(-) create mode 100644 e2e_test/streaming/rate_limit.slt create mode 100644 src/stream/src/executor/flow_control.rs diff --git a/Cargo.lock b/Cargo.lock index 6221e760e9e0..b395830f56ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3187,6 +3187,23 @@ dependencies = [ "async-trait", ] +[[package]] +name = "governor" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "821239e5672ff23e2a7060901fa622950bbd80b649cdaadd78d1c1767ed14eb4" +dependencies = [ + "cfg-if", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.1", + "rand", + "smallvec", +] + [[package]] name = "h2" version = "0.3.20" @@ -4440,6 +4457,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -4450,6 +4473,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -7251,6 +7280,7 @@ dependencies = [ "expect-test", "futures", "futures-async-stream", + "governor", "hytra", "iter-chunks", "itertools 0.11.0", diff --git a/e2e_test/streaming/rate_limit.slt b/e2e_test/streaming/rate_limit.slt new file mode 100644 index 000000000000..95e87b9fd681 --- /dev/null +++ b/e2e_test/streaming/rate_limit.slt @@ -0,0 +1,26 @@ +statement ok +CREATE TABLE t1(v1 int, v2 int); + +statement ok +SET RW_STREAMING_RATE_LIMIT TO 10000; + +statement ok +CREATE MATERIALIZED VIEW m AS SELECT * FROM t1; + +# Should take around 5s due to rate limit. +statement ok +INSERT INTO t1 select * from generate_series(1,50000); + +statement ok +FLUSH; + +query I +select count(*) from m; +---- +50000 + +statement ok +DROP MATERIALIZED VIEW m; + +statement ok +DROP TABLE t1; \ No newline at end of file diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 92a5dc89ee70..b5fa5d7074c5 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -449,6 +449,9 @@ message ChainNode { // The upstream materialized view info used by backfill. plan_common.StorageTableDesc table_desc = 7; + + // The rate limit for the chain node. + optional uint32 rate_limit = 8; } // BatchPlanNode is used for mv on mv snapshot read. diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 0ccf6628898e..79259f751236 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -34,7 +34,7 @@ use crate::util::epoch::Epoch; // This is a hack, &'static str is not allowed as a const generics argument. // TODO: refine this using the adt_const_params feature. -const CONFIG_KEYS: [&str; 34] = [ +const CONFIG_KEYS: [&str; 35] = [ "RW_IMPLICIT_FLUSH", "CREATE_COMPACTION_GROUP_FOR_MV", "QUERY_MODE", @@ -69,6 +69,7 @@ const CONFIG_KEYS: [&str; 34] = [ "LOCK_TIMEOUT", "ROW_SECURITY", "STANDARD_CONFORMING_STRINGS", + "RW_STREAMING_RATE_LIMIT", ]; // MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] = @@ -107,6 +108,7 @@ const STATEMENT_TIMEOUT: usize = 30; const LOCK_TIMEOUT: usize = 31; const ROW_SECURITY: usize = 32; const STANDARD_CONFORMING_STRINGS: usize = 33; +const RW_STREAMING_RATE_LIMIT: usize = 34; trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> { fn entry_name() -> &'static str; @@ -329,6 +331,7 @@ type StatementTimeout = ConfigI32; type LockTimeout = ConfigI32; type RowSecurity = ConfigBool; type StandardConformingStrings = ConfigString; +type StreamingRateLimit = ConfigU64; /// Report status or notice to caller. pub trait ConfigReporter { @@ -468,6 +471,8 @@ pub struct ConfigMap { expression = "ConfigString::(String::from(\"on\"))" ))] standard_conforming_strings: StandardConformingStrings, + + streaming_rate_limit: StreamingRateLimit, } impl ConfigMap { @@ -579,6 +584,8 @@ impl ConfigMap { self.row_security = val.as_slice().try_into()?; } else if key.eq_ignore_ascii_case(StandardConformingStrings::entry_name()) { self.standard_conforming_strings = val.as_slice().try_into()?; + } else if key.eq_ignore_ascii_case(StreamingRateLimit::entry_name()) { + self.streaming_rate_limit = val.as_slice().try_into()?; } else { return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()); } @@ -660,6 +667,8 @@ impl ConfigMap { Ok(self.row_security.to_string()) } else if key.eq_ignore_ascii_case(StandardConformingStrings::entry_name()) { Ok(self.standard_conforming_strings.to_string()) + } else if key.eq_ignore_ascii_case(StreamingRateLimit::entry_name()) { + Ok(self.streaming_rate_limit.to_string()) } else { Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()) } @@ -836,6 +845,11 @@ impl ConfigMap { name: StandardConformingStrings::entry_name().to_lowercase(), setting: self.standard_conforming_strings.to_string(), description: String::from("Unused in RisingWave"), + }, + VariableInfo{ + name: StreamingRateLimit::entry_name().to_lowercase(), + setting: self.streaming_rate_limit.to_string(), + description: String::from("Set streaming rate limit (rows per second) for each parallelism for mv backfilling"), } ] } @@ -960,4 +974,11 @@ impl ConfigMap { pub fn get_standard_conforming_strings(&self) -> &str { &self.standard_conforming_strings } + + pub fn get_streaming_rate_limit(&self) -> Option { + if self.streaming_rate_limit.0 != 0 { + return Some(self.streaming_rate_limit.0 as u32); + } + None + } } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index fd81436da0aa..561c4fe86e71 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -28,6 +28,7 @@ use super::utils::{childless_record, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode}; use crate::catalog::ColumnId; use crate::expr::{ExprRewriter, FunctionCall}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -283,6 +284,12 @@ impl StreamTableScan { // The table desc used by backfill executor table_desc: Some(self.logical.table_desc.to_protobuf()), state_table: Some(catalog), + rate_limit: self + .base + .ctx() + .session_ctx() + .config() + .get_streaming_rate_limit(), })), stream_key, operator_id: self.base.id.0 as u64, diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index e86b8913a6e7..36b11d22defd 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -27,6 +27,7 @@ either = "1" enum-as-inner = "0.6" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } +governor = { version = "0.6", default-features = false, features = ["std", "dashmap", "jitter"] } hytra = "0.1.2" iter-chunks = "0.1" itertools = "0.11" diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs new file mode 100644 index 000000000000..1790f212566b --- /dev/null +++ b/src/stream/src/executor/flow_control.rs @@ -0,0 +1,98 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Debug, Formatter}; +use std::num::NonZeroU32; + +use governor::clock::MonotonicClock; +use governor::{Quota, RateLimiter}; +use risingwave_common::catalog::Schema; + +use super::*; + +/// Flow Control Executor is used to control the rate of the input executor. +/// +/// Currently it is placed after the `BackfillExecutor`: +/// upstream `MaterializeExecutor` -> `BackfillExecutor` -> `FlowControlExecutor` +/// +/// The rate limit is set statically at the moment, and cannot be changed in a running +/// stream graph. +/// +/// It is used to throttle problematic MVs that are consuming too much resources. +pub struct FlowControlExecutor { + input: BoxedExecutor, + rate_limit: u32, +} + +impl FlowControlExecutor { + #[allow(clippy::too_many_arguments)] + pub fn new(input: Box, rate_limit: u32) -> Self { + #[cfg(madsim)] + println!("FlowControlExecutor rate limiter is disabled in madsim as it will spawn system threads"); + Self { input, rate_limit } + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute_inner(self) { + let quota = Quota::per_second(NonZeroU32::new(self.rate_limit).unwrap()); + let clock = MonotonicClock; + let rate_limiter = RateLimiter::direct_with_clock(quota, &clock); + #[for_await] + for msg in self.input.execute() { + let msg = msg?; + match msg { + Message::Chunk(chunk) => { + #[cfg(not(madsim))] + { + let result = rate_limiter + .until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap()) + .await; + assert!( + result.is_ok(), + "the capacity of rate_limiter must be larger than the cardinality of chunk" + ); + } + yield Message::Chunk(chunk); + } + _ => yield msg, + } + } + } +} + +impl Debug for FlowControlExecutor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FlowControlExecutor") + .field("rate_limit", &self.rate_limit) + .finish() + } +} + +impl Executor for FlowControlExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.execute_inner().boxed() + } + + fn schema(&self) -> &Schema { + self.input.schema() + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + self.input.pk_indices() + } + + fn identity(&self) -> &str { + "FlowControlExecutor" + } +} diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 0351a5d3ca6e..4e4ff92caf17 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -65,6 +65,7 @@ mod dynamic_filter; mod error; mod expand; mod filter; +mod flow_control; mod hash_agg; pub mod hash_join; mod hop_window; @@ -113,6 +114,7 @@ pub use dynamic_filter::DynamicFilterExecutor; pub use error::{StreamExecutorError, StreamExecutorResult}; pub use expand::ExpandExecutor; pub use filter::FilterExecutor; +pub use flow_control::FlowControlExecutor; pub use hash_agg::HashAggExecutor; pub use hash_join::*; pub use hop_window::HopWindowExecutor; diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index 858eeb895491..d1a971a5cbb4 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -23,7 +23,9 @@ use risingwave_storage::table::Distribution; use super::*; use crate::common::table::state_table::StateTable; -use crate::executor::{BackfillExecutor, ChainExecutor, RearrangedChainExecutor}; +use crate::executor::{ + BackfillExecutor, ChainExecutor, FlowControlExecutor, RearrangedChainExecutor, +}; pub struct ChainExecutorBuilder; @@ -172,6 +174,10 @@ impl ExecutorBuilder for ChainExecutorBuilder { } ChainType::ChainUnspecified => unreachable!(), }; - Ok(executor) + if let Ok(rate_limit) = node.get_rate_limit() { + Ok(FlowControlExecutor::new(executor, *rate_limit).boxed()) + } else { + Ok(executor) + } } } From 2e4a448c5db1bde49666ed95c91e88e8e46fd728 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 30 Aug 2023 09:02:39 +0800 Subject: [PATCH 2/4] chore(deps): bump memchr from 2.5.0 to 2.6.1 (#11947) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b395830f56ab..2186fafbb0f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4246,9 +4246,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "memchr" -version = "2.5.0" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +checksum = "f478948fd84d9f8e86967bf432640e46adfb5a4bd4f14ef7e864ab38220534ae" [[package]] name = "memcomparable" From 1d0bf76cd36ef1fa7b11384353e451a0ec0f12b9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 30 Aug 2023 01:03:13 +0000 Subject: [PATCH 3/4] chore(deps): bump serde from 1.0.185 to 1.0.188 (#11946) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 8 ++++---- src/tests/simulation/Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2186fafbb0f2..dbf61ce31ee2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7715,9 +7715,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.185" +version = "1.0.188" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be9b6f69f1dfd54c3b568ffa45c310d6973a5e5148fd40cf515acaf38cf5bc31" +checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" dependencies = [ "serde_derive", ] @@ -7755,9 +7755,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.185" +version = "1.0.188" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc59dfdcbad1437773485e0367fea4b090a2e0a16d9ffc46af47764536a298ec" +checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 0b5534fc06b0..2e819327962a 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -40,7 +40,7 @@ risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_sqlparser = { workspace = true } risingwave_sqlsmith = { workspace = true } -serde = "1.0.185" +serde = "1.0.188" serde_derive = "1.0.183" serde_json = "1.0.105" sqllogictest = "0.15.2" From d1fe176adc3e32f813891584c5076b574df99dfe Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Wed, 30 Aug 2023 09:43:12 +0800 Subject: [PATCH 4/4] test(streaming): test hdr approach for `approx_percentile` (#11931) --- .../aggregate/hdr_approx_percentile.slt | 289 ++++++++++++++++++ 1 file changed, 289 insertions(+) create mode 100755 e2e_test/streaming/aggregate/hdr_approx_percentile.slt diff --git a/e2e_test/streaming/aggregate/hdr_approx_percentile.slt b/e2e_test/streaming/aggregate/hdr_approx_percentile.slt new file mode 100755 index 000000000000..b0969139e92e --- /dev/null +++ b/e2e_test/streaming/aggregate/hdr_approx_percentile.slt @@ -0,0 +1,289 @@ +# -------- +# Overview +# -------- +# +# This is a demo test to show how to use hdr_histogram (http://hdrhistogram.org/) +# to compute `approx_percentile` in the RisingWave stream engine. +# It supports non-append-only streams, i.e. streams with updates and deletes are supported. +# +# The way it works is that hdr_histogram stores N significant digits of each value, +# thereby reducing the number of buckets needed to store the distribution. +# +# ---------------------- +# Implementation details +# ---------------------- +# +# First we create the histogram itself, where each value is encoded as a triple: +# 1. The sign of the value (1 or -1) +# 2. The exponent of the value (the power of 10, truncated to an integer) +# 3. The significand of the value (the digits after the decimal point, truncated to $PRECISION digits) +# +# With the following parameters: +# precision=2 (significand is 2 digits) +# value=123456 +# +# The exponent will be 5 (log10(123456) = 5.0915... ~ 5) +# The significand will be 23 (123456 / 10^5 - 1 = 0.23... ). +# Then the histogram will store the triple (1, 5, 23) +# +# NOTE(kwannoel): We don't have to use base-10 for the exponent and significand. +# But RisingWave currently only supports log10, ln. +# So we use base-10 for convenience, and because it's more intuitive for the examples here. +# +# The bucket is computed as: +# (sign*(1.0+mantissa/pow(10.0, 2))*pow(10.0,exponent))::int +# (1 * (1.0 + 23 / 10^2) * 10^5) = 123000 +# +# Next, we use window sum aggregation to compute the cumulative frequency of each bucket. +# | Bucket | 4 | 311 | 400 | 521 | +# | Counts | 3 | 6 | 7 | 8 | +# | C. Frequency | 3 | 9 | 16 | 24 | +# +# Finally we can compute the approximate percentile +# 1. Compute the sum of all frequencies. (24) +# 2. Select a percentile (90%) +# 3. Compute the frequency corresponding to the percentile (24 * 0.9 = 21.6) +# 4. Find the first bucket with the cumulative frequency >= 21.6 (521) +# +# Step 3 and 4 are required for us to use dynamic filter, +# since we don't support complex expressions in the dynamic filter. +# In the future when our optimizer can simplify the dynamic filter, we can avoid it. +# +# ---------- +# Test Notes +# ---------- +# +# We will test the following parameters: +# - 90 percentile. +# - 50 percentile. +# - Deletes. +# - Compare with percentile_disc. The significand should match. + +statement ok +CREATE TABLE input (value BIGINT); + +statement ok +INSERT INTO input SELECT * FROM generate_series(1,10); + +statement ok +INSERT INTO input SELECT * FROM generate_series(1000, 1100); + +statement ok +INSERT INTO input SELECT * FROM generate_series(10000, 11000); + +statement ok +INSERT INTO input SELECT * FROM generate_series(100000, 110000); + +statement ok +flush; + +# Here `pow(10.0, 4)` is responsible for setting the precision of the histogram. +# "2" is the precision. +statement ok +CREATE MATERIALIZED VIEW hdr_histogram AS +SELECT + CASE WHEN value<0 THEN -1 ELSE 1 END AS sign, + trunc(log10(value))::int AS exponent, + trunc( + pow(10.0, 4) + * (value / pow(10.0, trunc(log10(value))::int) - 1.0))::int AS mantissa, + count(*) AS frequency, + 1 as dummy +FROM input +GROUP BY sign, exponent, mantissa; + +statement ok +CREATE MATERIALIZED VIEW hdr_sum AS +SELECT + sum(frequency) AS sum_frequency +FROM hdr_histogram; + +# Here `pow(10.0, 4)` is responsible for recovering the position of the mantissa, with the precision. +# "2" is the precision. +statement ok +CREATE MATERIALIZED VIEW hdr_distribution AS +SELECT + (sign*(1.0+mantissa/pow(10.0, 4))*pow(10.0,exponent))::int AS bucket, + (sum(frequency) + OVER (PARTITION BY NULL + ORDER BY (sign, exponent, mantissa) + ROWS UNBOUNDED PRECEDING) + ) AS cumulative_frequency +FROM hdr_histogram g +GROUP BY sign, exponent, mantissa, frequency +ORDER BY cumulative_frequency; + +statement ok +CREATE INDEX hdr_distribution_idx ON hdr_distribution (cumulative_frequency); + +########## 90-percentile ########## + +statement ok +CREATE MATERIALIZED VIEW frequency_at_90 AS +SELECT sum_frequency * 0.9 AS scaled_sum_freq FROM hdr_sum; + +statement ok +CREATE MATERIALIZED VIEW approx_percentile_90_percent AS +SELECT bucket AS approximate_percentile +FROM hdr_distribution x, frequency_at_90 y +WHERE x.cumulative_frequency >= y.scaled_sum_freq +ORDER BY cumulative_frequency +LIMIT 1; + +query I +SELECT * FROM approx_percentile_90_percent; +---- +108880 + +query I +SELECT percentile_disc(0.9) WITHIN GROUP (ORDER BY value) FROM input; +---- +108889 + +query B +SELECT + (abs(percentile_disc - approximate_percentile) / percentile_disc) <= 0.0001 +FROM approx_percentile_90_percent, (SELECT percentile_disc(0.9) WITHIN GROUP (ORDER BY value) FROM input); +---- +t + +statement ok +DROP MATERIALIZED VIEW approx_percentile_90_percent; + +statement ok +DROP MATERIALIZED VIEW frequency_at_90; + +########## 50-percentile ########## + +statement ok +CREATE MATERIALIZED VIEW frequency_at_50 AS +SELECT sum_frequency * 0.5 AS scaled_sum_freq FROM hdr_sum; + +statement ok +CREATE MATERIALIZED VIEW approx_percentile_50_percent AS +SELECT bucket AS approximate_percentile +FROM hdr_distribution x, frequency_at_50 y +WHERE x.cumulative_frequency >= y.scaled_sum_freq +ORDER BY cumulative_frequency +LIMIT 1; + +query I +SELECT * FROM approx_percentile_50_percent; +---- +104440 + +query I +SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY value) FROM input; +---- +104444 + +query B +SELECT + (abs(percentile_disc - approximate_percentile) / percentile_disc) <= 0.0001 +FROM approx_percentile_50_percent, (SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY value) FROM input); +---- +t + +### Delete > 100000 + +statement ok +DELETE FROM input WHERE value > 100000; + +statement ok +flush; + +query I +SELECT * FROM approx_percentile_50_percent; +---- +10445 + +query I +SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY value) FROM input; +---- +10445 + +query B +SELECT + (abs(percentile_disc - approximate_percentile) / percentile_disc) <= 0.0001 +FROM approx_percentile_50_percent, (SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY value) FROM input); +---- +t + +### Delete > 10000 + +statement ok +DELETE FROM input WHERE value > 10000; + +statement ok +flush; + +query I +SELECT * FROM approx_percentile_50_percent; +---- +1045 + +query I +SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY value) FROM input; +---- +1045 + +query B +SELECT + (abs(percentile_disc - approximate_percentile) / percentile_disc) <= 0.0001 +FROM approx_percentile_50_percent, (SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY value) FROM input); +---- +t + + +### Delete > 100 + +statement ok +DELETE FROM input WHERE value > 100; + +statement ok +flush; + +query I +SELECT * FROM approx_percentile_50_percent; +---- +5 + +query I +SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY value) FROM input; +---- +5 + +query B +SELECT + percentile_disc = (approximate_percentile::int) +FROM approx_percentile_50_percent, (SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY value) FROM input); +---- +t + +### Cleanup 50-percentile + +statement ok +DROP MATERIALIZED VIEW approx_percentile_50_percent; + +statement ok +DROP MATERIALIZED VIEW frequency_at_50; + +########## Cleanup ########## + +statement ok +DELETE FROM input WHERE value < 1400; + +statement ok +DROP INDEX hdr_distribution_idx; + +statement ok +DROP MATERIALIZED VIEW hdr_distribution; + +statement ok +DROP MATERIALIZED VIEW hdr_sum; + +statement ok +DROP MATERIALIZED VIEW hdr_histogram; + +statement ok +DROP TABLE input; \ No newline at end of file