Skip to content

Commit

Permalink
feat(sink): support new syntax FORMAT ... ENCODE ... similar to sou…
Browse files Browse the repository at this point in the history
…rce (#12556)
  • Loading branch information
xiangjinwu committed Oct 4, 2023
1 parent 8db10db commit a296a06
Show file tree
Hide file tree
Showing 22 changed files with 521 additions and 171 deletions.
2 changes: 1 addition & 1 deletion e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ create sink si_kafka_debezium from t_kafka with (
primary_key = 'id',
);

statement error primary key not defined for debezium kafka sink
statement error primary key not defined
create sink debezium_without_pk from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
Expand Down
10 changes: 9 additions & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ enum SinkType {
UPSERT = 3;
}

// Similar to `StreamSourceInfo`, and may replace `SinkType` later.
message SinkFormatDesc {
plan_common.FormatType format = 1;
plan_common.EncodeType encode = 2;
map<string, string> options = 3;
}

// the catalog of the sink. There are two kind of schema here. The full schema is all columns
// stored in the `column` which is the sink executor/fragment's output schema. The visible
// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
Expand All @@ -122,7 +129,7 @@ message Sink {
repeated int32 distribution_key = 8;
// User-defined primary key indices for the upsert sink.
repeated int32 downstream_pk = 9;
SinkType sink_type = 10;
SinkType sink_type = 10; // to be deprecated
uint32 owner = 11;
map<string, string> properties = 12;
string definition = 13;
Expand All @@ -132,6 +139,7 @@ message Sink {
string db_name = 17;
string sink_from_name = 18;
StreamJobStatus stream_job_status = 19;
SinkFormatDesc format_desc = 20;
}

message Connection {
Expand Down
3 changes: 2 additions & 1 deletion proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ message SinkParam {
uint32 sink_id = 1;
map<string, string> properties = 2;
TableSchema table_schema = 3;
catalog.SinkType sink_type = 4;
catalog.SinkType sink_type = 4; // to be deprecated
string db_name = 5;
string sink_from_name = 6;
catalog.SinkFormatDesc format_desc = 7;
}

enum SinkPayloadFormat {
Expand Down
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ message SinkDesc {
repeated uint32 downstream_pk = 6;
repeated uint32 distribution_key = 7;
map<string, string> properties = 8;
catalog.SinkType sink_type = 9;
catalog.SinkType sink_type = 9; // to be deprecated
repeated plan_common.ColumnCatalog column_catalogs = 10;
string db_name = 11;
// If the sink is from table or mv, this is name of the table/mv. Otherwise
// it is the name of the sink itself.
string sink_from_name = 12;
catalog.SinkFormatDesc format_desc = 13;
}

enum SinkLogStoreType {
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::catalog::{
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::stream_plan::PbSinkDesc;

use super::{SinkCatalog, SinkId, SinkType};
use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkDesc {
Expand Down Expand Up @@ -55,6 +55,9 @@ pub struct SinkDesc {
// options in `properties`.
pub sink_type: SinkType,

// The format and encode of the sink.
pub format_desc: Option<SinkFormatDesc>,

/// Name of the database
pub db_name: String,

Expand Down Expand Up @@ -86,6 +89,7 @@ impl SinkDesc {
dependent_relations,
properties: self.properties.into_iter().collect(),
sink_type: self.sink_type,
format_desc: self.format_desc,
connection_id,
created_at_epoch: None,
initialized_at_epoch: None,
Expand All @@ -109,6 +113,7 @@ impl SinkDesc {
distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
properties: self.properties.clone().into_iter().collect(),
sink_type: self.sink_type.to_proto() as i32,
format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
db_name: self.db_name.clone(),
sink_from_name: self.sink_from_name.clone(),
}
Expand Down
148 changes: 146 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@

pub mod desc;

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, DatabaseId, Field, Schema, SchemaId, TableId, UserId,
};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{PbSink, PbSinkType, PbStreamJobStatus};
use risingwave_pb::catalog::{PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus};

use super::{
SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
};

#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
pub struct SinkId {
Expand Down Expand Up @@ -96,6 +102,128 @@ impl SinkType {
}
}

/// May replace [`SinkType`].
///
/// TODO: consolidate with [`crate::source::SourceStruct`] and [`crate::parser::SpecificParserConfig`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkFormatDesc {
pub format: SinkFormat,
pub encode: SinkEncode,
pub options: BTreeMap<String, String>,
}

/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SinkFormat {
AppendOnly,
Upsert,
Debezium,
}

/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SinkEncode {
Json,
Protobuf,
Avro,
}

impl SinkFormatDesc {
pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
use crate::sink::kafka::KafkaSink;
use crate::sink::kinesis::KinesisSink;
use crate::sink::pulsar::PulsarSink;
use crate::sink::Sink as _;

let format = match r#type {
SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
SINK_TYPE_UPSERT => SinkFormat::Upsert,
SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
_ => {
return Err(SinkError::Config(anyhow!(
"sink type unsupported: {}",
r#type
)))
}
};
let encode = match connector {
KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
SinkEncode::Json
}
_ => return Ok(None),
};
Ok(Some(Self {
format,
encode,
options: Default::default(),
}))
}

pub fn to_proto(&self) -> PbSinkFormatDesc {
use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};

let format = match self.format {
SinkFormat::AppendOnly => F::Plain,
SinkFormat::Upsert => F::Upsert,
SinkFormat::Debezium => F::Debezium,
};
let encode = match self.encode {
SinkEncode::Json => E::Json,
SinkEncode::Protobuf => E::Protobuf,
SinkEncode::Avro => E::Avro,
};
let options = self
.options
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

PbSinkFormatDesc {
format: format.into(),
encode: encode.into(),
options,
}
}
}

impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
type Error = SinkError;

fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};

let format = match value.format() {
F::Plain => SinkFormat::AppendOnly,
F::Upsert => SinkFormat::Upsert,
F::Debezium => SinkFormat::Debezium,
f @ (F::Unspecified | F::Native | F::DebeziumMongo | F::Maxwell | F::Canal) => {
return Err(SinkError::Config(anyhow!(
"sink format unsupported: {}",
f.as_str_name()
)))
}
};
let encode = match value.encode() {
E::Json => SinkEncode::Json,
E::Protobuf => SinkEncode::Protobuf,
E::Avro => SinkEncode::Avro,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes) => {
return Err(SinkError::Config(anyhow!(
"sink encode unsupported: {}",
e.as_str_name()
)))
}
};
let options = value.options.into_iter().collect();

Ok(Self {
format,
encode,
options,
})
}
}

/// the catalog of the sink. There are two kind of schema here. The full schema is all columns
/// stored in the `column` which is the sink executor/fragment's output schema. The visible
/// schema contains the columns whose `is_hidden` is false, which is the columns sink out to the
Expand Down Expand Up @@ -144,6 +272,9 @@ pub struct SinkCatalog {
// options in `properties`.
pub sink_type: SinkType,

// The format and encode of the sink.
pub format_desc: Option<SinkFormatDesc>,

/// Sink may use a privatelink connection to connect to the downstream system.
pub connection_id: Option<ConnectionId>,

Expand Down Expand Up @@ -186,6 +317,7 @@ impl SinkCatalog {
owner: self.owner.into(),
properties: self.properties.clone(),
sink_type: self.sink_type.to_proto() as i32,
format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
connection_id: self.connection_id.map(|id| id.into()),
initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
created_at_epoch: self.created_at_epoch.map(|e| e.0),
Expand Down Expand Up @@ -233,6 +365,17 @@ impl SinkCatalog {
impl From<PbSink> for SinkCatalog {
fn from(pb: PbSink) -> Self {
let sink_type = pb.get_sink_type().unwrap();
let format_desc = match pb.format_desc {
Some(f) => f.try_into().ok(),
None => {
let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
let r#type = pb.properties.get(SINK_TYPE_OPTION);
match (connector, r#type) {
(Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
_ => None,
}
}
};
SinkCatalog {
id: pb.id.into(),
name: pb.name,
Expand Down Expand Up @@ -263,6 +406,7 @@ impl From<PbSink> for SinkCatalog {
.map(TableId::from)
.collect_vec(),
sink_type: SinkType::from_proto(sink_type),
format_desc,
connection_id: pb.connection_id.map(ConnectionId),
created_at_epoch: pb.created_at_epoch.map(Epoch::from),
initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
Expand Down
Loading

0 comments on commit a296a06

Please sign in to comment.