Skip to content

Commit

Permalink
feat: kafka-upsert with json,avro format
Browse files Browse the repository at this point in the history
  • Loading branch information
algosday committed Feb 22, 2023
1 parent 229a3c7 commit 1ab68af
Show file tree
Hide file tree
Showing 20 changed files with 425 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions dashboard/proto/gen/plan_common.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/catalog.proto
Expand Up @@ -30,6 +30,7 @@ message StreamSourceInfo {
string proto_message_name = 4;
int32 csv_delimiter = 5;
bool csv_has_header = 6;
string upsert_avro_primary_key = 7;
}

message Source {
Expand Down
2 changes: 2 additions & 0 deletions proto/plan_common.proto
Expand Up @@ -81,4 +81,6 @@ enum RowFormatType {
CSV = 7;
NATIVE = 8;
DEBEZIUM_AVRO = 9;
UPSERT_JSON = 10;
UPSERT_AVRO = 11;
}
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Expand Up @@ -22,6 +22,7 @@ aws-sdk-kinesis = { workspace = true }
aws-sdk-s3 = { workspace = true }
aws-smithy-http = { workspace = true }
aws-types = { workspace = true }
bincode = "1"
byteorder = "1"
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
Expand Down Expand Up @@ -66,7 +67,6 @@ tonic = { version = "0.2", package = "madsim-tonic" }
tracing = "0.1"
url = "2"
urlencoding = "2"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/common.rs
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;

use aws_sdk_kinesis::Client as KinesisClient;
use http::Uri;
use rdkafka::ClientConfig;
Expand Down Expand Up @@ -199,3 +201,11 @@ impl KinesisCommon {
Ok(KinesisClient::from_conf(builder.build()))
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct UpsertMessage<'a> {
#[serde(borrow)]
pub primary_key: Cow<'a, [u8]>,
#[serde(borrow)]
pub record: Cow<'a, [u8]>,
}
140 changes: 124 additions & 16 deletions src/connector/src/parser/avro/parser.rs
Expand Up @@ -26,6 +26,7 @@ use url::Url;

use super::schema_resolver::*;
use super::util::{extract_inner_field_schema, from_avro_value};
use crate::common::UpsertMessage;
use crate::impl_common_parser_logic;
use crate::parser::avro::util::avro_field_to_column_desc;
use crate::parser::schema_registry::{extract_schema_id, Client};
Expand All @@ -38,34 +39,59 @@ impl_common_parser_logic!(AvroParser);
#[derive(Debug)]
pub struct AvroParser {
schema: Arc<Schema>,
key_schema: Option<Arc<Schema>>,
schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
rw_columns: Vec<SourceColumnDesc>,
error_ctx: SourceErrorContext,
upsert_primary_key_column_name: Option<String>,
}

#[derive(Debug, Clone)]
pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
pub upsert_primary_key_column_name: Option<String>,
}

impl AvroParserConfig {
pub async fn new(
props: &HashMap<String, String>,
schema_location: &str,
use_schema_registry: bool,
enable_upsert: bool,
upsert_primary_key_column_name: Option<String>,
) -> Result<Self> {
let url = Url::parse(schema_location).map_err(|e| {
InternalError(format!("failed to parse url ({}): {}", schema_location, e))
})?;
let (schema, schema_resolver) = if use_schema_registry {
if use_schema_registry {
let kafka_topic = get_kafka_topic(props)?;
let client = Client::new(url, props)?;
let (schema, resolver) =
ConfluentSchemaResolver::new(format!("{}-value", kafka_topic).as_str(), client)
.await?;
(Arc::new(schema), Some(Arc::new(resolver)))
let resolver = ConfluentSchemaResolver::new(client);

Ok(Self {
schema: resolver
.get_by_subject_name(&format!("{}-value", kafka_topic))
.await?,
key_schema: if enable_upsert {
Some(
resolver
.get_by_subject_name(&format!("{}-key", kafka_topic))
.await?,
)
} else {
None
},
schema_resolver: Some(Arc::new(resolver)),
upsert_primary_key_column_name,
})
} else {
if enable_upsert {
return Err(RwError::from(InternalError(
"avro upsert without schema registry is not supported".to_string(),
)));
}
let schema_content = match url.scheme() {
"file" => read_schema_from_local(url.path()),
"s3" => read_schema_from_s3(&url, props).await,
Expand All @@ -78,12 +104,28 @@ impl AvroParserConfig {
let schema = Schema::parse_str(&schema_content).map_err(|e| {
RwError::from(InternalError(format!("Avro schema parse error {}", e)))
})?;
(Arc::new(schema), None)
};
Ok(Self {
schema,
schema_resolver,
})
Ok(Self {
schema: Arc::new(schema),
key_schema: None,
schema_resolver: None,
upsert_primary_key_column_name: None,
})
}
}

pub fn extract_pks(&self) -> Result<Vec<ColumnDesc>> {
if let Some(Schema::Record { fields, .. }) = self.key_schema.as_deref() {
let mut index = 0;
let fields = fields
.iter()
.map(|field| avro_field_to_column_desc(&field.name, &field.schema, &mut index))
.collect::<Result<Vec<_>>>()?;
Ok(fields)
} else {
Err(RwError::from(InternalError(
"schema invalid, record required".into(),
)))
}
}

pub fn map_to_columns(&self) -> Result<Vec<ColumnDesc>> {
Expand Down Expand Up @@ -112,30 +154,61 @@ impl AvroParser {
) -> Result<Self> {
let AvroParserConfig {
schema,
key_schema,
schema_resolver,
upsert_primary_key_column_name,
} = config;
Ok(Self {
schema,
key_schema,
schema_resolver,
rw_columns,
error_ctx,
upsert_primary_key_column_name,
})
}

/// The presence of a `key_schema` implies that upsert is enabled.
fn is_enable_upsert(&self) -> bool {
self.key_schema.is_some()
}

pub(crate) async fn parse_inner(
&self,
payload: &[u8],
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
enum Op {
Insert,
Delete,
}

let (_payload, op) = if self.is_enable_upsert() {
let msg: UpsertMessage<'_> = bincode::deserialize(payload)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
if !msg.record.is_empty() {
(msg.record, Op::Insert)
} else {
(msg.primary_key, Op::Delete)
}
} else {
(payload.into(), Op::Insert)
};

// parse payload to avro value
// if use confluent schema, get writer schema from confluent schema registry
let avro_value = if let Some(resolver) = &self.schema_resolver {
let (schema_id, mut raw_payload) = extract_schema_id(payload)?;
let (schema_id, mut raw_payload) = extract_schema_id(&_payload)?;
let writer_schema = resolver.get(schema_id).await?;
from_avro_datum(writer_schema.as_ref(), &mut raw_payload, Some(&self.schema))
let reader_schema = if matches!(op, Op::Delete) {
self.key_schema.as_deref()
} else {
Some(&*self.schema)
};
from_avro_datum(writer_schema.as_ref(), &mut raw_payload, reader_schema)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?
} else {
let mut reader = Reader::with_schema(&self.schema, payload)
let mut reader = Reader::with_schema(&self.schema, payload as &[u8])
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
match reader.next() {
Some(Ok(v)) => v,
Expand All @@ -150,7 +223,7 @@ impl AvroParser {

// parse the value to rw value
if let Value::Record(fields) = avro_value {
writer.insert(|column| {
let fill = |column: &SourceColumnDesc| {
let tuple = fields
.iter()
.find(|val| column.name.eq(&val.0))
Expand All @@ -169,6 +242,41 @@ impl AvroParser {
);
e
})
};
match op {
Op::Insert => writer.insert(fill),
Op::Delete => writer.delete(fill),
}
} else if self.upsert_primary_key_column_name.is_some()
&& matches!(op, Op::Delete)
&& matches!(
avro_value,
Value::Boolean(_)
| Value::String(_)
| Value::Int(_)
| Value::Long(_)
| Value::Float(_)
| Value::Decimal(_)
| Value::Date(_)
| Value::TimestampMillis(_)
| Value::TimestampMicros(_)
| Value::Duration(_)
)
{
writer.delete(|desc| {
if &desc.name != self.upsert_primary_key_column_name.as_ref().unwrap() {
Ok(None)
} else {
from_avro_value(avro_value.clone(), self.key_schema.as_deref().unwrap())
.map_err(|e| {
tracing::error!(
"failed to process value ({}): {}",
String::from_utf8_lossy(payload),
e
);
e
})
}
})
} else {
Err(RwError::from(ProtocolError(
Expand Down Expand Up @@ -255,7 +363,7 @@ mod test {

async fn new_avro_conf_from_local(file_name: &str) -> error::Result<AvroParserConfig> {
let schema_path = "file://".to_owned() + &test_data_path(file_name);
AvroParserConfig::new(&HashMap::new(), schema_path.as_str(), false).await
AvroParserConfig::new(&HashMap::new(), schema_path.as_str(), false, false, None).await
}

async fn new_avro_parser_from_local(file_name: &str) -> error::Result<AvroParser> {
Expand Down

0 comments on commit 1ab68af

Please sign in to comment.