Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): make blocking sink default + support background sink #16249

Merged
merged 9 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions e2e_test/backfill/sink/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ SET STREAMING_RATE_LIMIT = 1000;
statement ok
insert into t select * from generate_series(1, 10000);

statement ok
SET BACKGROUND_DDL=true;
kwannoel marked this conversation as resolved.
Show resolved Hide resolved

statement ok
create sink s as select x.v1 as v1
from t x join t y
Expand All @@ -23,3 +26,6 @@ with (
allow.auto.create.topics=true,
)
FORMAT DEBEZIUM ENCODE JSON;

statement ok
SET BACKGROUND_DDL=false;
6 changes: 6 additions & 0 deletions e2e_test/streaming/rate_limit/snapshot_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ INSERT INTO table select 1 from generate_series(1, 100000);
statement ok
flush;

statement ok
SET BACKGROUND_DDL=true;

statement ok
CREATE SINK sink AS
SELECT x.i1 as i1 FROM table x
JOIN table s1 ON x.i1 = s1.i1
JOIN table s2 ON x.i1 = s2.i1
WITH (connector = 'blackhole');

statement ok
SET BACKGROUND_DDL=false;

# Let sink amplify...
skipif in-memory
sleep 1s
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/streaming/rate_limit/upstream_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ WITH (
datagen.rows.per.second = '10000'
) FORMAT PLAIN ENCODE JSON;

statement ok
SET BACKGROUND_DDL=true;

statement ok
CREATE SINK sink AS
SELECT x.i1 as i1 FROM source_table x
Expand All @@ -26,6 +29,9 @@ CREATE SINK sink AS
JOIN source_table s3 ON x.i1 = s3.i1
WITH (connector = 'blackhole');

statement ok
SET BACKGROUND_DDL=false;

# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink.
# Otherwise, these FLUSH will take a long time to complete, and trigger timeout.
statement ok
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl SinkDesc {
target_table: self.target_table,
created_at_cluster_version: None,
initialized_at_cluster_version: None,
create_type: CreateType::Foreground,
create_type: self.create_type,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::catalog::CreateType;
use risingwave_pb::meta::table_fragments::PbActorStatus;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
Expand Down Expand Up @@ -164,6 +165,7 @@ pub enum Command {
init_split_assignment: SplitAssignment,
definition: String,
ddl_type: DdlType,
create_type: CreateType,
replace_table: Option<ReplaceTablePlan>,
},
/// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given
Expand Down
91 changes: 50 additions & 41 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::catalog::CreateType;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
Expand Down Expand Up @@ -430,48 +431,56 @@ impl CreateMviewProgressTracker {
return Some(TrackingJob::New(command));
}

let (creating_mv_id, upstream_mv_count, upstream_total_key_count, definition, ddl_type) =
if let Command::CreateStreamingJob {
table_fragments,
dispatchers,
upstream_root_actors,
definition,
ddl_type,
..
} = &command.context.command
{
// Keep track of how many times each upstream MV appears.
let mut upstream_mv_count = HashMap::new();
for (table_id, actors) in upstream_root_actors {
assert!(!actors.is_empty());
let dispatch_count: usize = dispatchers
.iter()
.filter(|(upstream_actor_id, _)| actors.contains(upstream_actor_id))
.map(|(_, v)| v.len())
.sum();
upstream_mv_count.insert(*table_id, dispatch_count / actors.len());
}

let upstream_total_key_count: u64 = upstream_mv_count
let (
creating_mv_id,
upstream_mv_count,
upstream_total_key_count,
definition,
ddl_type,
create_type,
) = if let Command::CreateStreamingJob {
table_fragments,
dispatchers,
upstream_root_actors,
definition,
ddl_type,
create_type,
..
} = &command.context.command
{
// Keep track of how many times each upstream MV appears.
let mut upstream_mv_count = HashMap::new();
for (table_id, actors) in upstream_root_actors {
assert!(!actors.is_empty());
let dispatch_count: usize = dispatchers
.iter()
.map(|(upstream_mv, count)| {
*count as u64
* version_stats
.table_stats
.get(&upstream_mv.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.filter(|(upstream_actor_id, _)| actors.contains(upstream_actor_id))
.map(|(_, v)| v.len())
.sum();
(
table_fragments.table_id(),
upstream_mv_count,
upstream_total_key_count,
definition.to_string(),
ddl_type,
)
} else {
unreachable!("Must be CreateStreamingJob.");
};
upstream_mv_count.insert(*table_id, dispatch_count / actors.len());
}

let upstream_total_key_count: u64 = upstream_mv_count
.iter()
.map(|(upstream_mv, count)| {
*count as u64
* version_stats
.table_stats
.get(&upstream_mv.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.sum();
(
table_fragments.table_id(),
upstream_mv_count,
upstream_total_key_count,
definition.to_string(),
ddl_type,
create_type,
)
} else {
unreachable!("Must be CreateStreamingJob.");
};

for &actor in &actors {
self.actor_map.insert(actor, creating_mv_id);
Expand All @@ -483,7 +492,7 @@ impl CreateMviewProgressTracker {
upstream_total_key_count,
definition,
);
if *ddl_type == DdlType::Sink {
if *ddl_type == DdlType::Sink && *create_type == CreateType::Background {
// We return the original tracking job immediately.
// This is because sink can be decoupled with backfill progress.
// We don't need to wait for sink to finish backfill.
Comment on lines +495 to 498
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this path still necessary? Can we now simply treat sinks like materialized views?

Copy link
Contributor Author

@kwannoel kwannoel Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose so. But it will require more refactoring of code.

Sink is unique from materialized view in that the output will immediately be visible, since it's external to the stream job.
For materialized views, we have to do something extra, which is to hide the output until backfill is complete.

As such we persist materialized views twice, when they undergo background ddl:

  1. On initial creation. (mark as Creating, i.e. invisible, but recoverable).
  2. After creation succeeds (mark as Finished, i.e. visible).

For sink we can immediately mark it as Finished (i.e. just step 2), we can't do that for MVs.

For Index, we have to also go through the same flow as materialized view. When I support Index I will also review to see if Sink can be supported likewise and unify more branches of code.

For now I think this is acceptable approach which requires least changes, and with simple concept.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sink we can immediately mark it as Finished (i.e. just step 2), we can't do that for MVs.

This is interesting to realize. 😄

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ impl StreamingJob {
Self::MaterializedView(table) => {
table.get_create_type().unwrap_or(CreateType::Foreground)
}
Self::Sink(s, _) => s.get_create_type().unwrap_or(CreateType::Foreground),
_ => CreateType::Foreground,
}
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ impl GlobalStreamManager {
definition: definition.to_string(),
ddl_type,
replace_table: replace_table_command,
create_type,
};
tracing::debug!("sending Command::CreateStreamingJob");
if let Err(err) = self.barrier_scheduler.run_command(command).await {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ async fn test_foreground_index_cancel() -> Result<()> {
}

#[tokio::test]
async fn test_sink_create() -> Result<()> {
async fn test_background_sink_create() -> Result<()> {
init_logger();
let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?;
let mut session = cluster.start_session();
Expand All @@ -450,6 +450,7 @@ async fn test_sink_create() -> Result<()> {

let mut session2 = cluster.start_session();
tokio::spawn(async move {
session2.run(SET_BACKGROUND_DDL).await.unwrap();
session2.run(SET_RATE_LIMIT_2).await.unwrap();
session2
.run("CREATE SINK s FROM t WITH (connector='blackhole');")
Expand Down
Loading