Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support new syntax FORMAT ... ENCODE ... similar to source #12556

Merged
merged 12 commits into from
Oct 4, 2023
2 changes: 1 addition & 1 deletion e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ create sink si_kafka_debezium from t_kafka with (
primary_key = 'id',
);

statement error primary key not defined for debezium kafka sink
statement error primary key not defined
create sink debezium_without_pk from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
Expand Down
10 changes: 9 additions & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ enum SinkType {
UPSERT = 3;
}

// Similar to `StreamSourceInfo`, and may replace `SinkType` later.
message SinkFormatDesc {
plan_common.FormatType format = 1;
plan_common.EncodeType encode = 2;
map<string, string> options = 3;
}

// the catalog of the sink. There are two kind of schema here. The full schema is all columns
// stored in the `column` which is the sink executor/fragment's output schema. The visible
// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
Expand All @@ -122,7 +129,7 @@ message Sink {
repeated int32 distribution_key = 8;
// User-defined primary key indices for the upsert sink.
repeated int32 downstream_pk = 9;
SinkType sink_type = 10;
SinkType sink_type = 10; // to be deprecated
uint32 owner = 11;
map<string, string> properties = 12;
string definition = 13;
Expand All @@ -132,6 +139,7 @@ message Sink {
string db_name = 17;
string sink_from_name = 18;
StreamJobStatus stream_job_status = 19;
SinkFormatDesc format_desc = 20;
}

message Connection {
Expand Down
3 changes: 2 additions & 1 deletion proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ message SinkParam {
uint32 sink_id = 1;
map<string, string> properties = 2;
TableSchema table_schema = 3;
catalog.SinkType sink_type = 4;
catalog.SinkType sink_type = 4; // to be deprecated
string db_name = 5;
string sink_from_name = 6;
catalog.SinkFormatDesc format_desc = 7;
}

enum SinkPayloadFormat {
Expand Down
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ message SinkDesc {
repeated uint32 downstream_pk = 6;
repeated uint32 distribution_key = 7;
map<string, string> properties = 8;
catalog.SinkType sink_type = 9;
catalog.SinkType sink_type = 9; // to be deprecated
repeated plan_common.ColumnCatalog column_catalogs = 10;
string db_name = 11;
// If the sink is from table or mv, this is name of the table/mv. Otherwise
// it is the name of the sink itself.
string sink_from_name = 12;
catalog.SinkFormatDesc format_desc = 13;
}

enum SinkLogStoreType {
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::catalog::{
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::stream_plan::PbSinkDesc;

use super::{SinkCatalog, SinkId, SinkType};
use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkDesc {
Expand Down Expand Up @@ -55,6 +55,9 @@ pub struct SinkDesc {
// options in `properties`.
pub sink_type: SinkType,

// The format and encode of the sink.
pub format_desc: Option<SinkFormatDesc>,

/// Name of the database
pub db_name: String,

Expand Down Expand Up @@ -86,6 +89,7 @@ impl SinkDesc {
dependent_relations,
properties: self.properties.into_iter().collect(),
sink_type: self.sink_type,
format_desc: self.format_desc,
connection_id,
created_at_epoch: None,
initialized_at_epoch: None,
Expand All @@ -109,6 +113,7 @@ impl SinkDesc {
distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
properties: self.properties.clone().into_iter().collect(),
sink_type: self.sink_type.to_proto() as i32,
format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
db_name: self.db_name.clone(),
sink_from_name: self.sink_from_name.clone(),
}
Expand Down
148 changes: 146 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@

pub mod desc;

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, DatabaseId, Field, Schema, SchemaId, TableId, UserId,
};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{PbSink, PbSinkType, PbStreamJobStatus};
use risingwave_pb::catalog::{PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus};

use super::{
SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
};

#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
pub struct SinkId {
Expand Down Expand Up @@ -96,6 +102,128 @@ impl SinkType {
}
}

/// May replace [`SinkType`].
///
/// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkFormatDesc {
pub format: SinkFormat,
pub encode: SinkEncode,
pub options: BTreeMap<String, String>,
}

/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SinkFormat {
AppendOnly,
Upsert,
Debezium,
}

/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SinkEncode {
Json,
Protobuf,
Avro,
}

impl SinkFormatDesc {
pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
use crate::sink::kafka::KafkaSink;
use crate::sink::kinesis::KinesisSink;
use crate::sink::pulsar::PulsarSink;
use crate::sink::Sink as _;

let format = match r#type {
SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
SINK_TYPE_UPSERT => SinkFormat::Upsert,
SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
_ => {
return Err(SinkError::Config(anyhow!(
"sink type unsupported: {}",
r#type
)))
}
};
let encode = match connector {
KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
SinkEncode::Json
}
_ => return Ok(None),
};
Ok(Some(Self {
format,
encode,
options: Default::default(),
}))
}

pub fn to_proto(&self) -> PbSinkFormatDesc {
use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};

let format = match self.format {
SinkFormat::AppendOnly => F::Plain,
SinkFormat::Upsert => F::Upsert,
SinkFormat::Debezium => F::Debezium,
};
let encode = match self.encode {
SinkEncode::Json => E::Json,
SinkEncode::Protobuf => E::Protobuf,
SinkEncode::Avro => E::Avro,
};
let options = self
.options
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

PbSinkFormatDesc {
format: format.into(),
encode: encode.into(),
options,
}
}
}

impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
type Error = SinkError;

fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};

let format = match value.format() {
F::Plain => SinkFormat::AppendOnly,
F::Upsert => SinkFormat::Upsert,
F::Debezium => SinkFormat::Debezium,
f @ (F::Unspecified | F::Native | F::DebeziumMongo | F::Maxwell | F::Canal) => {
return Err(SinkError::Config(anyhow!(
"sink format unsupported: {}",
f.as_str_name()
)))
}
};
let encode = match value.encode() {
E::Json => SinkEncode::Json,
E::Protobuf => SinkEncode::Protobuf,
E::Avro => SinkEncode::Avro,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes) => {
return Err(SinkError::Config(anyhow!(
"sink encode unsupported: {}",
e.as_str_name()
)))
}
};
let options = value.options.into_iter().collect();

Ok(Self {
format,
encode,
options,
})
}
}

/// the catalog of the sink. There are two kind of schema here. The full schema is all columns
/// stored in the `column` which is the sink executor/fragment's output schema. The visible
/// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
Expand Down Expand Up @@ -144,6 +272,9 @@ pub struct SinkCatalog {
// options in `properties`.
pub sink_type: SinkType,

// The format and encode of the sink.
pub format_desc: Option<SinkFormatDesc>,

/// Sink may use a privatelink connection to connect to the downstream system.
pub connection_id: Option<ConnectionId>,

Expand Down Expand Up @@ -186,6 +317,7 @@ impl SinkCatalog {
owner: self.owner.into(),
properties: self.properties.clone(),
sink_type: self.sink_type.to_proto() as i32,
format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
connection_id: self.connection_id.map(|id| id.into()),
initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
created_at_epoch: self.created_at_epoch.map(|e| e.0),
Expand Down Expand Up @@ -233,6 +365,17 @@ impl SinkCatalog {
impl From<PbSink> for SinkCatalog {
fn from(pb: PbSink) -> Self {
let sink_type = pb.get_sink_type().unwrap();
let format_desc = match pb.format_desc {
Some(f) => f.try_into().ok(),
None => {
let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
let r#type = pb.properties.get(SINK_TYPE_OPTION);
match (connector, r#type) {
(Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
_ => None,
}
}
};
SinkCatalog {
id: pb.id.into(),
name: pb.name,
Expand Down Expand Up @@ -263,6 +406,7 @@ impl From<PbSink> for SinkCatalog {
.map(TableId::from)
.collect_vec(),
sink_type: SinkType::from_proto(sink_type),
format_desc,
connection_id: pb.connection_id.map(ConnectionId),
created_at_epoch: pb.created_at_epoch.map(Epoch::from),
initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
Expand Down
Loading
Loading