Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: kafka-upsert with json,avro format #8111

Merged
merged 3 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions dashboard/proto/gen/plan_common.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ message StreamSourceInfo {
string proto_message_name = 4;
int32 csv_delimiter = 5;
bool csv_has_header = 6;
string upsert_avro_primary_key = 7;
}

message Source {
Expand Down
2 changes: 2 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,6 @@ enum RowFormatType {
CSV = 7;
NATIVE = 8;
DEBEZIUM_AVRO = 9;
UPSERT_JSON = 10;
UPSERT_AVRO = 11;
}
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ aws-sdk-kinesis = { workspace = true }
aws-sdk-s3 = { workspace = true }
aws-smithy-http = { workspace = true }
aws-types = { workspace = true }
bincode = "1"
byteorder = "1"
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
Expand Down Expand Up @@ -66,7 +67,6 @@ tonic = { version = "0.2", package = "madsim-tonic" }
tracing = "0.1"
url = "2"
urlencoding = "2"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;

use aws_sdk_kinesis::Client as KinesisClient;
use http::Uri;
use rdkafka::ClientConfig;
Expand Down Expand Up @@ -199,3 +201,11 @@ impl KinesisCommon {
Ok(KinesisClient::from_conf(builder.build()))
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct UpsertMessage<'a> {
#[serde(borrow)]
pub primary_key: Cow<'a, [u8]>,
#[serde(borrow)]
pub record: Cow<'a, [u8]>,
}
160 changes: 134 additions & 26 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use url::Url;

use super::schema_resolver::*;
use super::util::{extract_inner_field_schema, from_avro_value};
use crate::common::UpsertMessage;
use crate::impl_common_parser_logic;
use crate::parser::avro::util::avro_field_to_column_desc;
use crate::parser::schema_registry::{extract_schema_id, Client};
Expand All @@ -38,34 +39,59 @@ impl_common_parser_logic!(AvroParser);
#[derive(Debug)]
pub struct AvroParser {
schema: Arc<Schema>,
key_schema: Option<Arc<Schema>>,
schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
rw_columns: Vec<SourceColumnDesc>,
error_ctx: SourceErrorContext,
upsert_primary_key_column_name: Option<String>,
}

#[derive(Debug, Clone)]
pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
pub upsert_primary_key_column_name: Option<String>,
}

impl AvroParserConfig {
pub async fn new(
props: &HashMap<String, String>,
schema_location: &str,
use_schema_registry: bool,
enable_upsert: bool,
upsert_primary_key_column_name: Option<String>,
) -> Result<Self> {
let url = Url::parse(schema_location).map_err(|e| {
InternalError(format!("failed to parse url ({}): {}", schema_location, e))
})?;
let (schema, schema_resolver) = if use_schema_registry {
if use_schema_registry {
let kafka_topic = get_kafka_topic(props)?;
let client = Client::new(url, props)?;
let (schema, resolver) =
ConfluentSchemaResolver::new(format!("{}-value", kafka_topic).as_str(), client)
.await?;
(Arc::new(schema), Some(Arc::new(resolver)))
let resolver = ConfluentSchemaResolver::new(client);

Ok(Self {
schema: resolver
.get_by_subject_name(&format!("{}-value", kafka_topic))
.await?,
key_schema: if enable_upsert {
Some(
resolver
.get_by_subject_name(&format!("{}-key", kafka_topic))
.await?,
)
} else {
None
},
schema_resolver: Some(Arc::new(resolver)),
upsert_primary_key_column_name,
})
} else {
if enable_upsert {
return Err(RwError::from(InternalError(
"avro upsert without schema registry is not supported".to_string(),
)));
}
let schema_content = match url.scheme() {
"file" => read_schema_from_local(url.path()),
"s3" => read_schema_from_s3(&url, props).await,
Expand All @@ -78,12 +104,28 @@ impl AvroParserConfig {
let schema = Schema::parse_str(&schema_content).map_err(|e| {
RwError::from(InternalError(format!("Avro schema parse error {}", e)))
})?;
(Arc::new(schema), None)
};
Ok(Self {
schema,
schema_resolver,
})
Ok(Self {
schema: Arc::new(schema),
key_schema: None,
schema_resolver: None,
upsert_primary_key_column_name: None,
})
}
}

pub fn extract_pks(&self) -> Result<Vec<ColumnDesc>> {
if let Some(Schema::Record { fields, .. }) = self.key_schema.as_deref() {
let mut index = 0;
let fields = fields
.iter()
.map(|field| avro_field_to_column_desc(&field.name, &field.schema, &mut index))
.collect::<Result<Vec<_>>>()?;
Ok(fields)
} else {
Err(RwError::from(InternalError(
"schema invalid, record required".into(),
)))
}
}

pub fn map_to_columns(&self) -> Result<Vec<ColumnDesc>> {
Expand Down Expand Up @@ -112,30 +154,65 @@ impl AvroParser {
) -> Result<Self> {
let AvroParserConfig {
schema,
key_schema,
schema_resolver,
upsert_primary_key_column_name,
} = config;
Ok(Self {
schema,
key_schema,
schema_resolver,
rw_columns,
error_ctx,
upsert_primary_key_column_name,
})
}

/// The presence of a `key_schema` implies that upsert is enabled.
fn is_enable_upsert(&self) -> bool {
self.key_schema.is_some()
}

pub(crate) async fn parse_inner(
&self,
payload: &[u8],
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
enum Op {
Insert,
Delete,
}

let (_payload, op) = if self.is_enable_upsert() {
let msg: UpsertMessage<'_> = bincode::deserialize(payload).map_err(|e| {
RwError::from(ProtocolError(format!(
"extract payload err {:?}, you may need to check the 'upsert' parameter",
e
)))
})?;
if !msg.record.is_empty() {
(msg.record, Op::Insert)
} else {
(msg.primary_key, Op::Delete)
}
} else {
(payload.into(), Op::Insert)
};

// parse payload to avro value
// if use confluent schema, get writer schema from confluent schema registry
let avro_value = if let Some(resolver) = &self.schema_resolver {
let (schema_id, mut raw_payload) = extract_schema_id(payload)?;
let (schema_id, mut raw_payload) = extract_schema_id(&_payload)?;
let writer_schema = resolver.get(schema_id).await?;
from_avro_datum(writer_schema.as_ref(), &mut raw_payload, Some(&self.schema))
let reader_schema = if matches!(op, Op::Delete) {
self.key_schema.as_deref()
} else {
Some(&*self.schema)
};
from_avro_datum(writer_schema.as_ref(), &mut raw_payload, reader_schema)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?
} else {
let mut reader = Reader::with_schema(&self.schema, payload)
let mut reader = Reader::with_schema(&self.schema, payload as &[u8])
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
match reader.next() {
Some(Ok(v)) => v,
Expand All @@ -148,18 +225,14 @@ impl AvroParser {
}
};

// parse the value to rw value
// the avro can be a key or a value
if let Value::Record(fields) = avro_value {
writer.insert(|column| {
let tuple = fields
.iter()
.find(|val| column.name.eq(&val.0))
.ok_or_else(|| {
RwError::from(InternalError(format!(
"no field named {} in avro msg",
column.name
)))
})?;
let fill = |column: &SourceColumnDesc| {
let tuple = match fields.iter().find(|val| column.name.eq(&val.0)) {
None => return Ok(None),
Some(tup) => tup,
};

let field_schema = extract_inner_field_schema(&self.schema, Some(&column.name))?;
from_avro_value(tuple.1.clone(), field_schema).map_err(|e| {
tracing::error!(
Expand All @@ -169,6 +242,41 @@ impl AvroParser {
);
e
})
};
match op {
Op::Insert => writer.insert(fill),
Op::Delete => writer.delete(fill),
}
} else if self.upsert_primary_key_column_name.is_some()
&& matches!(op, Op::Delete)
&& matches!(
avro_value,
Value::Boolean(_)
| Value::String(_)
| Value::Int(_)
| Value::Long(_)
| Value::Float(_)
| Value::Decimal(_)
| Value::Date(_)
| Value::TimestampMillis(_)
| Value::TimestampMicros(_)
| Value::Duration(_)
)
{
writer.delete(|desc| {
if &desc.name != self.upsert_primary_key_column_name.as_ref().unwrap() {
Ok(None)
} else {
from_avro_value(avro_value.clone(), self.key_schema.as_deref().unwrap())
.map_err(|e| {
tracing::error!(
"failed to process value ({}): {}",
String::from_utf8_lossy(payload),
e
);
e
})
}
})
} else {
Err(RwError::from(ProtocolError(
Expand Down Expand Up @@ -255,7 +363,7 @@ mod test {

async fn new_avro_conf_from_local(file_name: &str) -> error::Result<AvroParserConfig> {
let schema_path = "file://".to_owned() + &test_data_path(file_name);
AvroParserConfig::new(&HashMap::new(), schema_path.as_str(), false).await
AvroParserConfig::new(&HashMap::new(), schema_path.as_str(), false, false, None).await
}

async fn new_avro_parser_from_local(file_name: &str) -> error::Result<AvroParser> {
Expand Down