Skip to content

Commit

Permalink
feat: support maxwell format (#6057)
Browse files Browse the repository at this point in the history
* impl maxwell cdc format

Signed-off-by: tabVersion <tabvision@bupt.icu>

* format

Signed-off-by: tabVersion <tabvision@bupt.icu>

* frontend

Signed-off-by: tabVersion <tabvision@bupt.icu>

* format

Signed-off-by: tabVersion <tabvision@bupt.icu>

Signed-off-by: tabVersion <tabvision@bupt.icu>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tabVersion and mergify[bot] authored Oct 27, 2022
1 parent bd8fb46 commit 263112a
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 1 deletion.
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ enum RowFormatType {
PROTOBUF = 2;
DEBEZIUM_JSON = 3;
AVRO = 4;
MAXWELL = 5;
}
17 changes: 17 additions & 0 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,23 @@ pub async fn handle_create_source(
row_schema_location: "".to_string(),
},
),
SourceSchema::Maxwell => {
// return err if user has not specified a pk
if row_id_index.is_some() {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with row format debezium."
.to_string(),
)));
}
(
columns,
StreamSourceInfo {
row_format: RowFormatType::Maxwell as i32,
row_schema_location: "".to_string(),
},
)
}

SourceSchema::DebeziumJson => {
// return err if user has not specified a pk
if row_id_index.is_some() {
Expand Down
1 change: 1 addition & 0 deletions src/source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub enum SourceFormat {
Protobuf,
DebeziumJson,
Avro,
Maxwell,
}

#[derive(Debug, EnumAsInner)]
Expand Down
1 change: 1 addition & 0 deletions src/source/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ impl SourceDescBuilder {
RowFormatType::Protobuf => SourceFormat::Protobuf,
RowFormatType::DebeziumJson => SourceFormat::DebeziumJson,
RowFormatType::Avro => SourceFormat::Avro,
RowFormatType::Maxwell => SourceFormat::Maxwell,
RowFormatType::RowUnspecified => unreachable!(),
};

Expand Down
123 changes: 123 additions & 0 deletions src/source/src/parser/maxwell/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::fmt::Debug;

use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;

use crate::parser::common::json_parse_value;
use crate::{SourceParser, SourceStreamChunkRowWriter, WriteGuard};

const MAXWELL_INSERT_OP: &str = "insert";
const MAXWELL_UPDATE_OP: &str = "update";
const MAXWELL_DELETE_OP: &str = "delete";

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MaxwellEvent {
pub data: Option<BTreeMap<String, Value>>,
pub old: Option<BTreeMap<String, Value>>,
#[serde(rename = "type")]
pub op: String,
#[serde(rename = "ts")]
pub ts_ms: i64,
}

#[derive(Debug)]
pub struct MaxwellParser;

impl SourceParser for MaxwellParser {
fn parse(&self, payload: &[u8], writer: SourceStreamChunkRowWriter<'_>) -> Result<WriteGuard> {
let event: MaxwellEvent = serde_json::from_slice(payload)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;

match event.op.as_str() {
MAXWELL_INSERT_OP => {
let after = event.data.as_ref().ok_or_else(|| {
RwError::from(ProtocolError(
"data is missing for creating event".to_string(),
))
})?;
writer.insert(|column| {
json_parse_value(&column.data_type, after.get(&column.name)).map_err(Into::into)
})
}
MAXWELL_UPDATE_OP => {
let after = event.data.as_ref().ok_or_else(|| {
RwError::from(ProtocolError(
"data is missing for creating event".to_string(),
))
})?;
let before = event.old.as_ref().ok_or_else(|| {
RwError::from(ProtocolError(
"old is missing for creating event".to_string(),
))
})?;

// old only contains the changed columns but data contains all columns.
// we copy data columns here and overwrite with change ones to get the original row.
let mut before_full = after.clone();
before_full.extend(before.clone());

writer.update(|column| {
let before = json_parse_value(&column.data_type, before.get(&column.name))?;
let after = json_parse_value(&column.data_type, after.get(&column.name))?;
Ok((before, after))
})
}
MAXWELL_DELETE_OP => {
let before = event.data.as_ref().ok_or_else(|| {
RwError::from(ProtocolError(
"old is missing for creating event".to_string(),
))
})?;
writer.delete(|column| {
json_parse_value(&column.data_type, before.get(&column.name))
.map_err(Into::into)
})
}
other => Err(RwError::from(ProtocolError(format!(
"unknown Maxwell op: {}",
other
)))),
}
}
}

mod tests {
#[allow(unused_imports)]
use super::*;

#[test]
fn test_event_deserialize() {
let payload = "{\"database\":\"test\",\"table\":\"e\",\"type\":\"update\",\"ts\":1477053234,\"data\":{\"id\":1,\"m\":5.444,\"c\":\"2016-10-21 05:33:54.631000\",\"comment\":\"I am a creature of light.\"},\"old\":{\"m\":4.2341,\"c\":\"2016-10-21 05:33:37.523000\"}}";
let event: MaxwellEvent = serde_json::from_slice(payload.as_ref()).unwrap();
println!("event: {:?}", event);

assert_eq!(event.op, MAXWELL_UPDATE_OP.to_string());

let mut after = event.data.unwrap();
let before = event.old.unwrap();

println!("{:?}", after);
after.extend(before.clone());
println!("{:?}", after);

assert_eq!(after.get("c"), before.get("c"));
}
}
17 changes: 17 additions & 0 deletions src/source/src/parser/maxwell/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod json;

pub use json::*;
6 changes: 5 additions & 1 deletion src/source/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::Datum;

use crate::parser::maxwell::MaxwellParser;
use crate::{SourceColumnDesc, SourceFormat};

mod avro_parser;
mod common;
mod debezium;
mod json_parser;
mod maxwell;
mod pb_parser;
// mod protobuf_parser;

/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
pub struct SourceStreamChunkBuilder {
Expand Down Expand Up @@ -269,6 +270,7 @@ pub enum SourceParserImpl {
Protobuf(ProtobufParser),
DebeziumJson(DebeziumJsonParser),
Avro(AvroParser),
Maxwell(MaxwellParser),
}

impl SourceParserImpl {
Expand All @@ -282,6 +284,7 @@ impl SourceParserImpl {
Self::Protobuf(parser) => parser.parse(payload, writer),
Self::DebeziumJson(parser) => parser.parse(payload, writer),
Self::Avro(avro_parser) => avro_parser.parse(payload, writer),
Self::Maxwell(maxwell_parser) => maxwell_parser.parse(payload, writer),
}
}

Expand All @@ -308,6 +311,7 @@ impl SourceParserImpl {
SourceFormat::Avro => {
SourceParserImpl::Avro(AvroParser::new(schema_location, properties.clone()).await?)
}
SourceFormat::Maxwell => SourceParserImpl::Maxwell(MaxwellParser),
_ => {
return Err(RwError::from(ProtocolError(
"format not support".to_string(),
Expand Down
4 changes: 4 additions & 0 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub enum SourceSchema {
Json, // Keyword::JSON
DebeziumJson, // Keyword::DEBEZIUM_JSON
Avro(AvroSchema), // Keyword::AVRO
Maxwell,
}

impl ParseTo for SourceSchema {
Expand All @@ -102,6 +103,8 @@ impl ParseTo for SourceSchema {
} else if p.parse_keywords(&[Keyword::AVRO]) {
impl_parse_to!(avro_schema: AvroSchema, p);
SourceSchema::Avro(avro_schema)
} else if p.parse_keywords(&[Keyword::MAXWELL]) {
SourceSchema::Maxwell
} else {
return Err(ParserError::ParserError(
"expected JSON | PROTOBUF | DEBEZIUM JSON | AVRO after ROW FORMAT".to_string(),
Expand All @@ -116,6 +119,7 @@ impl fmt::Display for SourceSchema {
match self {
SourceSchema::Protobuf(protobuf_schema) => write!(f, "PROTOBUF {}", protobuf_schema),
SourceSchema::Json => write!(f, "JSON"),
SourceSchema::Maxwell => write!(f, "MAXWELL"),
SourceSchema::DebeziumJson => write!(f, "DEBEZIUM JSON"),
SourceSchema::Avro(avro_schema) => write!(f, "AVRO {}", avro_schema),
}
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ define_keywords!(
MATCH,
MATERIALIZED,
MAX,
MAXWELL,
MEMBER,
MERGE,
MESSAGE,
Expand Down

0 comments on commit 263112a

Please sign in to comment.