Skip to content

Commit

Permalink
feat(meta): make blocking sink default + support background sink (#16249
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kwannoel committed Apr 17, 2024
1 parent a2694b9 commit ffd4194
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 43 deletions.
6 changes: 6 additions & 0 deletions e2e_test/backfill/sink/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ SET STREAMING_RATE_LIMIT = 1000;
statement ok
insert into t select * from generate_series(1, 10000);

statement ok
SET BACKGROUND_DDL=true;

statement ok
create sink s as select x.v1 as v1
from t x join t y
Expand All @@ -23,3 +26,6 @@ with (
allow.auto.create.topics=true,
)
FORMAT DEBEZIUM ENCODE JSON;

statement ok
SET BACKGROUND_DDL=false;
6 changes: 6 additions & 0 deletions e2e_test/streaming/rate_limit/snapshot_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ INSERT INTO table select 1 from generate_series(1, 100000);
statement ok
flush;

statement ok
SET BACKGROUND_DDL=true;

statement ok
CREATE SINK sink AS
SELECT x.i1 as i1 FROM table x
JOIN table s1 ON x.i1 = s1.i1
JOIN table s2 ON x.i1 = s2.i1
WITH (connector = 'blackhole');

statement ok
SET BACKGROUND_DDL=false;

# Let sink amplify...
skipif in-memory
sleep 1s
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/streaming/rate_limit/upstream_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ WITH (
datagen.rows.per.second = '10000'
) FORMAT PLAIN ENCODE JSON;

statement ok
SET BACKGROUND_DDL=true;

statement ok
CREATE SINK sink AS
SELECT x.i1 as i1 FROM source_table x
Expand All @@ -26,6 +29,9 @@ CREATE SINK sink AS
JOIN source_table s3 ON x.i1 = s3.i1
WITH (connector = 'blackhole');

statement ok
SET BACKGROUND_DDL=false;

# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink.
# Otherwise, these FLUSH will take a long time to complete, and trigger timeout.
statement ok
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl SinkDesc {
target_table: self.target_table,
created_at_cluster_version: None,
initialized_at_cluster_version: None,
create_type: CreateType::Foreground,
create_type: self.create_type,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::catalog::CreateType;
use risingwave_pb::meta::table_fragments::PbActorStatus;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
Expand Down Expand Up @@ -164,6 +165,7 @@ pub enum Command {
init_split_assignment: SplitAssignment,
definition: String,
ddl_type: DdlType,
create_type: CreateType,
replace_table: Option<ReplaceTablePlan>,
},
/// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given
Expand Down
91 changes: 50 additions & 41 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::catalog::CreateType;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
Expand Down Expand Up @@ -430,48 +431,56 @@ impl CreateMviewProgressTracker {
return Some(TrackingJob::New(command));
}

let (creating_mv_id, upstream_mv_count, upstream_total_key_count, definition, ddl_type) =
if let Command::CreateStreamingJob {
table_fragments,
dispatchers,
upstream_root_actors,
definition,
ddl_type,
..
} = &command.context.command
{
// Keep track of how many times each upstream MV appears.
let mut upstream_mv_count = HashMap::new();
for (table_id, actors) in upstream_root_actors {
assert!(!actors.is_empty());
let dispatch_count: usize = dispatchers
.iter()
.filter(|(upstream_actor_id, _)| actors.contains(upstream_actor_id))
.map(|(_, v)| v.len())
.sum();
upstream_mv_count.insert(*table_id, dispatch_count / actors.len());
}

let upstream_total_key_count: u64 = upstream_mv_count
let (
creating_mv_id,
upstream_mv_count,
upstream_total_key_count,
definition,
ddl_type,
create_type,
) = if let Command::CreateStreamingJob {
table_fragments,
dispatchers,
upstream_root_actors,
definition,
ddl_type,
create_type,
..
} = &command.context.command
{
// Keep track of how many times each upstream MV appears.
let mut upstream_mv_count = HashMap::new();
for (table_id, actors) in upstream_root_actors {
assert!(!actors.is_empty());
let dispatch_count: usize = dispatchers
.iter()
.map(|(upstream_mv, count)| {
*count as u64
* version_stats
.table_stats
.get(&upstream_mv.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.filter(|(upstream_actor_id, _)| actors.contains(upstream_actor_id))
.map(|(_, v)| v.len())
.sum();
(
table_fragments.table_id(),
upstream_mv_count,
upstream_total_key_count,
definition.to_string(),
ddl_type,
)
} else {
unreachable!("Must be CreateStreamingJob.");
};
upstream_mv_count.insert(*table_id, dispatch_count / actors.len());
}

let upstream_total_key_count: u64 = upstream_mv_count
.iter()
.map(|(upstream_mv, count)| {
*count as u64
* version_stats
.table_stats
.get(&upstream_mv.table_id)
.map_or(0, |stat| stat.total_key_count as u64)
})
.sum();
(
table_fragments.table_id(),
upstream_mv_count,
upstream_total_key_count,
definition.to_string(),
ddl_type,
create_type,
)
} else {
unreachable!("Must be CreateStreamingJob.");
};

for &actor in &actors {
self.actor_map.insert(actor, creating_mv_id);
Expand All @@ -483,7 +492,7 @@ impl CreateMviewProgressTracker {
upstream_total_key_count,
definition,
);
if *ddl_type == DdlType::Sink {
if *ddl_type == DdlType::Sink && *create_type == CreateType::Background {
// We return the original tracking job immediately.
// This is because sink can be decoupled with backfill progress.
// We don't need to wait for sink to finish backfill.
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ impl StreamingJob {
Self::MaterializedView(table) => {
table.get_create_type().unwrap_or(CreateType::Foreground)
}
Self::Sink(s, _) => s.get_create_type().unwrap_or(CreateType::Foreground),
_ => CreateType::Foreground,
}
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ impl GlobalStreamManager {
definition: definition.to_string(),
ddl_type,
replace_table: replace_table_command,
create_type,
};
tracing::debug!("sending Command::CreateStreamingJob");
if let Err(err) = self.barrier_scheduler.run_command(command).await {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ async fn test_foreground_index_cancel() -> Result<()> {
}

#[tokio::test]
async fn test_sink_create() -> Result<()> {
async fn test_background_sink_create() -> Result<()> {
init_logger();
let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?;
let mut session = cluster.start_session();
Expand All @@ -450,6 +450,7 @@ async fn test_sink_create() -> Result<()> {

let mut session2 = cluster.start_session();
tokio::spawn(async move {
session2.run(SET_BACKGROUND_DDL).await.unwrap();
session2.run(SET_RATE_LIMIT_2).await.unwrap();
session2
.run("CREATE SINK s FROM t WITH (connector='blackhole');")
Expand Down

0 comments on commit ffd4194

Please sign in to comment.