Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support new syntax FORMAT ... ENCODE ... similar to source #12556

Merged
merged 12 commits into from
Oct 4, 2023
10 changes: 9 additions & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ enum SinkType {
UPSERT = 3;
}

// Similar to `StreamSourceInfo`, and may replace `SinkType` later.
message SinkFormatDesc {
plan_common.FormatType format = 1;
plan_common.EncodeType encode = 2;
map<string, string> options = 3;
}

// the catalog of the sink. There are two kind of schema here. The full schema is all columns
// stored in the `column` which is the sink executor/fragment's output schema. The visible
// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
Expand All @@ -122,7 +129,7 @@ message Sink {
repeated int32 distribution_key = 8;
// User-defined primary key indices for the upsert sink.
repeated int32 downstream_pk = 9;
SinkType sink_type = 10;
SinkType sink_type = 10; // to be deprecated
uint32 owner = 11;
map<string, string> properties = 12;
string definition = 13;
Expand All @@ -132,6 +139,7 @@ message Sink {
string db_name = 17;
string sink_from_name = 18;
StreamJobStatus stream_job_status = 19;
SinkFormatDesc format_desc = 20;
}

message Connection {
Expand Down
3 changes: 2 additions & 1 deletion proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ message SinkParam {
uint32 sink_id = 1;
map<string, string> properties = 2;
TableSchema table_schema = 3;
catalog.SinkType sink_type = 4;
catalog.SinkType sink_type = 4; // to be deprecated
string db_name = 5;
string sink_from_name = 6;
catalog.SinkFormatDesc format_desc = 7;
}

enum SinkPayloadFormat {
Expand Down
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ message SinkDesc {
repeated uint32 downstream_pk = 6;
repeated uint32 distribution_key = 7;
map<string, string> properties = 8;
catalog.SinkType sink_type = 9;
catalog.SinkType sink_type = 9; // to be deprecated
repeated plan_common.ColumnCatalog column_catalogs = 10;
string db_name = 11;
// If the sink is from table or mv, this is name of the table/mv. Otherwise
// it is the name of the sink itself.
string sink_from_name = 12;
catalog.SinkFormatDesc format_desc = 13;
}

enum SinkLogStoreType {
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::catalog::{
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::stream_plan::PbSinkDesc;

use super::{SinkCatalog, SinkId, SinkType};
use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkDesc {
Expand Down Expand Up @@ -55,6 +55,9 @@ pub struct SinkDesc {
// options in `properties`.
pub sink_type: SinkType,

// The format and encode of the sink.
pub format_desc: Option<SinkFormatDesc>,

/// Name of the database
pub db_name: String,

Expand Down Expand Up @@ -86,6 +89,7 @@ impl SinkDesc {
dependent_relations,
properties: self.properties.into_iter().collect(),
sink_type: self.sink_type,
format_desc: self.format_desc,
connection_id,
created_at_epoch: None,
initialized_at_epoch: None,
Expand All @@ -109,6 +113,7 @@ impl SinkDesc {
distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
properties: self.properties.clone().into_iter().collect(),
sink_type: self.sink_type.to_proto() as i32,
format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
db_name: self.db_name.clone(),
sink_from_name: self.sink_from_name.clone(),
}
Expand Down
118 changes: 116 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@

pub mod desc;

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

use itertools::Itertools;
use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, DatabaseId, Field, Schema, SchemaId, TableId, UserId,
};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{PbSink, PbSinkType, PbStreamJobStatus};
use risingwave_pb::catalog::{PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus};

use super::{SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};

#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
pub struct SinkId {
Expand Down Expand Up @@ -96,6 +98,106 @@ impl SinkType {
}
}

/// May replace [`SinkType`].
///
/// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkFormatDesc {
pub format: SinkFormat,
pub encode: SinkEncode,
pub options: BTreeMap<String, String>,
}

/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SinkFormat {
AppendOnly,
Upsert,
Debezium,
}

/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SinkEncode {
Json,
Protobuf,
Avro,
}

impl SinkFormatDesc {
pub fn from_legacy_type(t: &str) -> Option<Self> {
let format = match t {
SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
SINK_TYPE_UPSERT => SinkFormat::Upsert,
SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
_ => return None,
};
Some(Self {
format,
encode: SinkEncode::Json,
options: Default::default(),
})
}

pub fn to_proto(&self) -> PbSinkFormatDesc {
use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};

let format = match self.format {
SinkFormat::AppendOnly => F::Plain,
SinkFormat::Upsert => F::Upsert,
SinkFormat::Debezium => F::Debezium,
};
let encode = match self.encode {
SinkEncode::Json => E::Json,
SinkEncode::Protobuf => E::Protobuf,
SinkEncode::Avro => E::Avro,
};
let options = self
.options
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

PbSinkFormatDesc {
format: format.into(),
encode: encode.into(),
options,
}
}
}

impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
type Error = String;

fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};

let format = match value.format() {
F::Plain => SinkFormat::AppendOnly,
F::Upsert => SinkFormat::Upsert,
F::Debezium => SinkFormat::Debezium,
f @ (F::Unspecified | F::Native | F::DebeziumMongo | F::Maxwell | F::Canal) => {
return Err(format!("sink format unsupported: {}", f.as_str_name()))
}
};
let encode = match value.encode() {
E::Json => SinkEncode::Json,
E::Protobuf => SinkEncode::Protobuf,
E::Avro => SinkEncode::Avro,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes) => {
return Err(format!("sink encode unsupported: {}", e.as_str_name()))
}
};
let options = value.options.into_iter().collect();

Ok(Self {
format,
encode,
options,
})
}
}

/// the catalog of the sink. There are two kind of schema here. The full schema is all columns
/// stored in the `column` which is the sink executor/fragment's output schema. The visible
/// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
Expand Down Expand Up @@ -144,6 +246,9 @@ pub struct SinkCatalog {
// options in `properties`.
pub sink_type: SinkType,

// The format and encode of the sink.
pub format_desc: Option<SinkFormatDesc>,

/// Sink may use a privatelink connection to connect to the downstream system.
pub connection_id: Option<ConnectionId>,

Expand Down Expand Up @@ -186,6 +291,7 @@ impl SinkCatalog {
owner: self.owner.into(),
properties: self.properties.clone(),
sink_type: self.sink_type.to_proto() as i32,
format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
connection_id: self.connection_id.map(|id| id.into()),
initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
created_at_epoch: self.created_at_epoch.map(|e| e.0),
Expand Down Expand Up @@ -233,6 +339,13 @@ impl SinkCatalog {
impl From<PbSink> for SinkCatalog {
fn from(pb: PbSink) -> Self {
let sink_type = pb.get_sink_type().unwrap();
let format_desc = match pb.format_desc {
Some(f) => f.try_into().ok(),
None => pb
.properties
.get(SINK_TYPE_OPTION)
.and_then(|t| SinkFormatDesc::from_legacy_type(t)),
};
SinkCatalog {
id: pb.id.into(),
name: pb.name,
Expand Down Expand Up @@ -263,6 +376,7 @@ impl From<PbSink> for SinkCatalog {
.map(TableId::from)
.collect_vec(),
sink_type: SinkType::from_proto(sink_type),
format_desc,
connection_id: pb.connection_id.map(ConnectionId),
created_at_epoch: pb.created_at_epoch.map(Epoch::from),
initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
Expand Down
13 changes: 12 additions & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use risingwave_rpc_client::{ConnectorClient, MetaClient};
use thiserror::Error;
pub use tracing;

use self::catalog::SinkType;
use self::catalog::{SinkFormatDesc, SinkType};
use crate::sink::catalog::{SinkCatalog, SinkId};
use crate::sink::log_store::LogReader;
use crate::sink::writer::SinkWriter;
Expand Down Expand Up @@ -132,13 +132,21 @@ pub struct SinkParam {
pub columns: Vec<ColumnDesc>,
pub downstream_pk: Vec<usize>,
pub sink_type: SinkType,
pub format_desc: Option<SinkFormatDesc>,
pub db_name: String,
pub sink_from_name: String,
}

impl SinkParam {
pub fn from_proto(pb_param: PbSinkParam) -> Self {
let table_schema = pb_param.table_schema.expect("should contain table schema");
let format_desc = match pb_param.format_desc {
Some(f) => f.try_into().ok(),
None => pb_param
.properties
.get(SINK_TYPE_OPTION)
.and_then(|t| SinkFormatDesc::from_legacy_type(t)),
};
Self {
sink_id: SinkId::from(pb_param.sink_id),
properties: pb_param.properties,
Expand All @@ -151,6 +159,7 @@ impl SinkParam {
sink_type: SinkType::from_proto(
PbSinkType::from_i32(pb_param.sink_type).expect("should be able to convert"),
),
format_desc,
db_name: pb_param.db_name,
sink_from_name: pb_param.sink_from_name,
}
Expand All @@ -165,6 +174,7 @@ impl SinkParam {
pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
}),
sink_type: self.sink_type.to_proto().into(),
format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
db_name: self.db_name.clone(),
sink_from_name: self.sink_from_name.clone(),
}
Expand All @@ -189,6 +199,7 @@ impl From<SinkCatalog> for SinkParam {
columns,
downstream_pk: sink_catalog.downstream_pk,
sink_type: sink_catalog.sink_type,
format_desc: sink_catalog.format_desc,
db_name: sink_catalog.db_name,
sink_from_name: sink_catalog.sink_from_name,
}
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/source/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,8 @@ mod tests {
use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::types::DataType;

use crate::sink::catalog::SinkType;
use crate::sink::SinkParam;
use crate::sink::catalog::{SinkFormatDesc, SinkType};
use crate::sink::{SinkParam, SINK_TYPE_APPEND_ONLY};
use crate::source::external::{
CdcOffset, ExternalTableReader, MySqlExternalTableReader, MySqlOffset, SchemaTableName,
};
Expand Down Expand Up @@ -587,6 +587,7 @@ mod tests {
],
downstream_pk: vec![0],
sink_type: SinkType::AppendOnly,
format_desc: SinkFormatDesc::from_legacy_type(SINK_TYPE_APPEND_ONLY),
db_name: "db".into(),
sink_from_name: "table".into(),
};
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,13 +777,15 @@ impl TestCase {
options.insert("connector".to_string(), "blackhole".to_string());
options.insert("type".to_string(), "append-only".to_string());
let options = WithOptions::new(options);
let format_desc = (&options).into();
match logical_plan.gen_sink_plan(
sink_name.to_string(),
format!("CREATE SINK {sink_name} AS {}", stmt),
options,
false,
"test_db".into(),
"test_table".into(),
format_desc,
) {
Ok(sink_plan) => {
ret.sink_plan = Some(explain_plan(&sink_plan.into()));
Expand Down
Loading
Loading