Skip to content

Commit

Permalink
feat: kafka-upsert with json,avro format
Browse files Browse the repository at this point in the history
  • Loading branch information
algosday committed Feb 22, 2023
1 parent e16e26d commit 625a64e
Show file tree
Hide file tree
Showing 17 changed files with 317 additions and 64 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,6 @@ enum RowFormatType {
CSV = 7;
NATIVE = 8;
DEBEZIUM_AVRO = 9;
UPSERT_JSON = 10;
UPSERT_AVRO = 11;
}
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ aws-sdk-kinesis = { workspace = true }
aws-sdk-s3 = { workspace = true }
aws-smithy-http = { workspace = true }
aws-types = { workspace = true }
bincode = "1"
byteorder = "1"
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
Expand Down Expand Up @@ -66,7 +67,6 @@ tonic = { version = "0.2", package = "madsim-tonic" }
tracing = "0.1"
url = "2"
urlencoding = "2"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;

use aws_sdk_kinesis::Client as KinesisClient;
use http::Uri;
use rdkafka::ClientConfig;
Expand Down Expand Up @@ -199,3 +201,11 @@ impl KinesisCommon {
Ok(KinesisClient::from_conf(builder.build()))
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct UpsertMessage<'a> {
#[serde(borrow)]
pub primary_key: Cow<'a, [u8]>,
#[serde(borrow)]
pub record: Cow<'a, [u8]>,
}
104 changes: 87 additions & 17 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use url::Url;

use super::schema_resolver::*;
use super::util::{extract_inner_field_schema, from_avro_value};
use crate::common::UpsertMessage;
use crate::impl_common_parser_logic;
use crate::parser::avro::util::avro_field_to_column_desc;
use crate::parser::schema_registry::{extract_schema_id, Client};
Expand All @@ -38,13 +39,15 @@ impl_common_parser_logic!(AvroParser);
#[derive(Debug)]
pub struct AvroParser {
schema: Arc<Schema>,
key_schema: Option<Arc<Schema>>,
schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
rw_columns: Vec<SourceColumnDesc>,
}

#[derive(Debug, Clone)]
pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
}

Expand All @@ -53,18 +56,37 @@ impl AvroParserConfig {
props: &HashMap<String, String>,
schema_location: &str,
use_schema_registry: bool,
enable_upsert: bool,
) -> Result<Self> {
let url = Url::parse(schema_location).map_err(|e| {
InternalError(format!("failed to parse url ({}): {}", schema_location, e))
})?;
let (schema, schema_resolver) = if use_schema_registry {
if use_schema_registry {
let kafka_topic = get_kafka_topic(props)?;
let client = Client::new(url, props)?;
let (schema, resolver) =
ConfluentSchemaResolver::new(format!("{}-value", kafka_topic).as_str(), client)
.await?;
(Arc::new(schema), Some(Arc::new(resolver)))
let resolver = ConfluentSchemaResolver::new(client);

Ok(Self {
schema: resolver
.get_by_subject_name(&format!("{}-value", kafka_topic))
.await?,
key_schema: if enable_upsert {
Some(
resolver
.get_by_subject_name(&format!("{}-key", kafka_topic))
.await?,
)
} else {
None
},
schema_resolver: Some(Arc::new(resolver)),
})
} else {
if enable_upsert {
return Err(RwError::from(InternalError(
"avro upsert without schema registry is not supported".to_string(),
)));
}
let schema_content = match url.scheme() {
"file" => read_schema_from_local(url.path()),
"s3" => read_schema_from_s3(&url, props).await,
Expand All @@ -77,12 +99,27 @@ impl AvroParserConfig {
let schema = Schema::parse_str(&schema_content).map_err(|e| {
RwError::from(InternalError(format!("Avro schema parse error {}", e)))
})?;
(Arc::new(schema), None)
};
Ok(Self {
schema,
schema_resolver,
})
Ok(Self {
schema: Arc::new(schema),
key_schema: None,
schema_resolver: None,
})
}
}

pub fn extract_pks(&self) -> Result<Vec<ColumnDesc>> {
if let Some(Schema::Record { fields, .. }) = self.key_schema.as_deref() {
let mut index = 0;
let fields = fields
.iter()
.map(|field| avro_field_to_column_desc(&field.name, &field.schema, &mut index))
.collect::<Result<Vec<_>>>()?;
Ok(fields)
} else {
Err(RwError::from(InternalError(
"schema invalid, record required".into(),
)))
}
}

pub fn map_to_columns(&self) -> Result<Vec<ColumnDesc>> {
Expand All @@ -107,29 +144,58 @@ impl AvroParser {
pub fn new(rw_columns: Vec<SourceColumnDesc>, config: AvroParserConfig) -> Result<Self> {
let AvroParserConfig {
schema,
key_schema,
schema_resolver,
} = config;
Ok(Self {
schema,
key_schema,
schema_resolver,
rw_columns,
})
}

/// The presence of a `key_schema` implies that upsert is enabled.
fn is_enable_upsert(&self) -> bool {
self.key_schema.is_some()
}

pub(crate) async fn parse_inner(
&self,
payload: &[u8],
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
enum Op {
Insert,
Delete,
}

let (_payload, op) = if self.is_enable_upsert() {
let msg: UpsertMessage<'_> = bincode::deserialize(payload)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
if !msg.record.is_empty() {
(msg.record, Op::Insert)
} else {
(msg.primary_key, Op::Delete)
}
} else {
(payload.into(), Op::Insert)
};

// parse payload to avro value
// if use confluent schema, get writer schema from confluent schema registry
let avro_value = if let Some(resolver) = &self.schema_resolver {
let (schema_id, mut raw_payload) = extract_schema_id(payload)?;
let (schema_id, mut raw_payload) = extract_schema_id(&_payload)?;
let writer_schema = resolver.get(schema_id).await?;
from_avro_datum(writer_schema.as_ref(), &mut raw_payload, Some(&self.schema))
let reader_schema = if matches!(op, Op::Delete) {
self.key_schema.as_deref()
} else {
Some(&*self.schema)
};
from_avro_datum(writer_schema.as_ref(), &mut raw_payload, reader_schema)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?
} else {
let mut reader = Reader::with_schema(&self.schema, payload)
let mut reader = Reader::with_schema(&self.schema, payload as &[u8])
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
match reader.next() {
Some(Ok(v)) => v,
Expand All @@ -144,7 +210,7 @@ impl AvroParser {

// parse the value to rw value
if let Value::Record(fields) = avro_value {
writer.insert(|column| {
let fill = |column: &SourceColumnDesc| {
let tuple = fields
.iter()
.find(|val| column.name.eq(&val.0))
Expand All @@ -163,7 +229,11 @@ impl AvroParser {
);
e
})
})
};
match op {
Op::Insert => writer.insert(fill),
Op::Delete => writer.delete(fill),
}
} else {
Err(RwError::from(ProtocolError(
"avro parse unexpected value".to_string(),
Expand Down Expand Up @@ -249,7 +319,7 @@ mod test {

async fn new_avro_conf_from_local(file_name: &str) -> error::Result<AvroParserConfig> {
let schema_path = "file://".to_owned() + &test_data_path(file_name);
AvroParserConfig::new(&HashMap::new(), schema_path.as_str(), false).await
AvroParserConfig::new(&HashMap::new(), schema_path.as_str(), false, false).await
}

async fn new_avro_parser_from_local(file_name: &str) -> error::Result<AvroParser> {
Expand Down
42 changes: 22 additions & 20 deletions src/connector/src/parser/avro/schema_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::error::{Result, RwError};
use url::Url;

use crate::aws_utils::{default_conn_config, s3_client, AwsConfigV2};
use crate::parser::schema_registry::Client;
use crate::parser::schema_registry::{Client, ConfluentSchema};
use crate::parser::util::download_from_http;

const AVRO_SCHEMA_LOCATION_S3_REGION: &str = "region";
Expand Down Expand Up @@ -89,20 +89,30 @@ pub struct ConfluentSchemaResolver {
}

impl ConfluentSchemaResolver {
// return the reader schema and a new `SchemaResolver`
pub async fn new(subject_name: &str, client: Client) -> Result<(Schema, Self)> {
let raw_schema = client.get_schema_by_subject(subject_name).await?;
async fn parse_and_cache_schema(&self, raw_schema: ConfluentSchema) -> Result<Arc<Schema>> {
let schema = Schema::parse_str(&raw_schema.content)
.map_err(|e| RwError::from(ProtocolError(format!("Avro schema parse error {}", e))))?;
let resolver = ConfluentSchemaResolver {
let schema = Arc::new(schema);
self.writer_schemas
.insert(raw_schema.id, Arc::clone(&schema))
.await;
Ok(schema)
}

/// Create a new `ConfluentSchemaResolver`
pub fn new(client: Client) -> Self {
ConfluentSchemaResolver {
writer_schemas: Cache::new(u64::MAX),
confluent_client: client,
};
resolver
.writer_schemas
.insert(raw_schema.id, Arc::new(schema.clone()))
.await;
Ok((schema, resolver))
}
}

pub async fn get_by_subject_name(&self, subject_name: &str) -> Result<Arc<Schema>> {
let raw_schema = self
.confluent_client
.get_schema_by_subject(subject_name)
.await?;
self.parse_and_cache_schema(raw_schema).await
}

// get the writer schema by id
Expand All @@ -111,15 +121,7 @@ impl ConfluentSchemaResolver {
Ok(schema)
} else {
let raw_schema = self.confluent_client.get_schema_by_id(schema_id).await?;

let schema = Schema::parse_str(&raw_schema.content).map_err(|e| {
RwError::from(ProtocolError(format!("Avro schema parse error {}", e)))
})?;
let schema = Arc::new(schema);
self.writer_schemas
.insert(schema_id, Arc::clone(&schema))
.await;
Ok(schema)
self.parse_and_cache_schema(raw_schema).await
}
}
}
8 changes: 5 additions & 3 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ impl DebeziumAvroParserConfig {
let key_schema = Schema::parse_str(&raw_schema.content)
.map_err(|e| RwError::from(ProtocolError(format!("Avro schema parse error {}", e))))?;

let (outer_schema, resolver) =
ConfluentSchemaResolver::new(format!("{}-value", kafka_topic).as_str(), client).await?;
let resolver = ConfluentSchemaResolver::new(client);
let outer_schema = resolver
.get_by_subject_name(&format!("{}-value", kafka_topic))
.await?;
let inner_schema = Self::extract_inner_schema(&outer_schema)?;
Ok(Self {
key_schema: Arc::new(key_schema),
outer_schema: Arc::new(outer_schema),
outer_schema,
inner_schema: Arc::new(inner_schema),
schema_resolver: Arc::new(resolver),
})
Expand Down

0 comments on commit 625a64e

Please sign in to comment.