Skip to content

Commit

Permalink
fix corner case
Browse files Browse the repository at this point in the history
  • Loading branch information
waruto210 authored and algosday committed Feb 23, 2023
1 parent 3a2ecd1 commit eec5182
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,12 @@ impl AvroParser {
}

let (_payload, op) = if self.is_enable_upsert() {
let msg: UpsertMessage<'_> = bincode::deserialize(payload)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
let msg: UpsertMessage<'_> = bincode::deserialize(payload).map_err(|e| {
RwError::from(ProtocolError(format!(
"extract payload err {:?}, you may need to check the 'upsert' parameter",
e
)))
})?;
if !msg.record.is_empty() {
(msg.record, Op::Insert)
} else {
Expand Down Expand Up @@ -221,18 +225,14 @@ impl AvroParser {
}
};

// parse the value to rw value
// the avro can be a key or a value
if let Value::Record(fields) = avro_value {
let fill = |column: &SourceColumnDesc| {
let tuple = fields
.iter()
.find(|val| column.name.eq(&val.0))
.ok_or_else(|| {
RwError::from(InternalError(format!(
"no field named {} in avro msg",
column.name
)))
})?;
let tuple = match fields.iter().find(|val| column.name.eq(&val.0)) {
None => return Ok(None),
Some(tup) => tup,
};

let field_schema = extract_inner_field_schema(&self.schema, Some(&column.name))?;
from_avro_value(tuple.1.clone(), field_schema).map_err(|e| {
tracing::error!(
Expand Down

0 comments on commit eec5182

Please sign in to comment.