Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
remove index

fix

fix
  • Loading branch information
xxhZs committed Mar 21, 2024
1 parent cb3c3d2 commit eaba3b6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
49 changes: 26 additions & 23 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,7 @@ impl BigQuerySinkWriter {
.iter()
.map(|f| (f.name.as_str(), &f.data_type)),
config.common.table.clone(),
1,
);
)?;

if !is_append_only {
let field = FieldDescriptorProto {
Expand Down Expand Up @@ -579,33 +578,33 @@ fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> prost_reflect::Desc
fn build_protobuf_schema<'a>(
fields: impl Iterator<Item = (&'a str, &'a DataType)>,
name: String,
index: i32,
) -> DescriptorProto {
) -> Result<DescriptorProto> {
let mut proto = DescriptorProto {
name: Some(name),
..Default::default()
};
let mut index_mut = index;
let mut field_vec = vec![];
let mut struct_vec = vec![];
for (name, data_type) in fields {
let (field, des_proto) = build_protobuf_field(data_type, index_mut, name.to_string());
field_vec.push(field);
if let Some(sv) = des_proto {
struct_vec.push(sv);
}
index_mut += 1;
}
let field_vec = fields
.enumerate()
.map(|(index, (name, data_type))| {
let (field, des_proto) =
build_protobuf_field(data_type, (index + 1) as i32, name.to_string())?;
if let Some(sv) = des_proto {
struct_vec.push(sv);
}
Ok(field)
})
.collect::<Result<Vec<_>>>()?;
proto.field = field_vec;
proto.nested_type = struct_vec;
proto
Ok(proto)
}

fn build_protobuf_field(
data_type: &DataType,
index: i32,
name: String,
) -> (FieldDescriptorProto, Option<DescriptorProto>) {
) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
let mut field = FieldDescriptorProto {
name: Some(name.clone()),
number: Some(index),
Expand All @@ -628,21 +627,25 @@ fn build_protobuf_field(
DataType::Struct(s) => {
field.r#type = Some(field_descriptor_proto::Type::Message.into());
let name = format!("Struct{}", name);
let sub_proto = build_protobuf_schema(s.iter(), name.clone(), 1);
let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
field.type_name = Some(name);
return (field, Some(sub_proto));
return Ok((field, Some(sub_proto)));
}
DataType::List(l) => {
let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone());
let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone())?;
field.label = Some(field_descriptor_proto::Label::Repeated.into());
return (field, proto);
return Ok((field, proto));
}
DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
DataType::Float32 | DataType::Int256 => todo!(),
DataType::Float32 | DataType::Int256 => {
return Err(SinkError::BigQuery(anyhow::anyhow!(
"Don't support Float32 and Int256"
)))
}
}
(field, None)
Ok((field, None))
}

#[cfg(test)]
Expand Down Expand Up @@ -701,7 +704,7 @@ mod test {
.fields()
.iter()
.map(|f| (f.name.as_str(), &f.data_type));
let desc = build_protobuf_schema(fields, "t1".to_string(), 1);
let desc = build_protobuf_schema(fields, "t1".to_string()).unwrap();
let pool = build_protobuf_descriptor_pool(&desc);
let t1_message = pool.get_message_by_name("t1").unwrap();
assert_matches!(
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/encoder/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ fn encode_field<D: MaybeData>(
* Group C: experimental */
},
DataType::Int16 => match (expect_list, proto_field.kind()) {
(false, Kind::String) if is_big_query => {
(false, Kind::Int64) if is_big_query => {
maybe.on_base(|s| Ok(Value::I64(s.into_int16() as i64)))?
}
_ => return no_match_err(),
Expand Down Expand Up @@ -486,7 +486,6 @@ mod tests {
let pool_bytes = std::fs::read(pool_path).unwrap();
let pool = prost_reflect::DescriptorPool::decode(pool_bytes.as_ref()).unwrap();
let descriptor = pool.get_message_by_name("recursive.AllTypes").unwrap();
println!("a{:?}", descriptor.descriptor_proto());
let schema = Schema::new(vec![
Field::with_name(DataType::Boolean, "bool_field"),
Field::with_name(DataType::Varchar, "string_field"),
Expand Down

0 comments on commit eaba3b6

Please sign in to comment.