Skip to content

Commit

Permalink
AVRO-3785: [Rust] Deserialization if reader schema has a namespace an…
Browse files Browse the repository at this point in the history
…d a union with null and a record containing a reference type (apache#2304)

Make use of the enclosing namespace when resolving union schemata

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>
  • Loading branch information
martin-g authored Jun 26, 2023
1 parent 79065fa commit 76d2d5d
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 11 deletions.
21 changes: 14 additions & 7 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet},
convert::{TryFrom, TryInto},
fmt,
fmt::Debug,
hash::Hash,
str::FromStr,
};
Expand Down Expand Up @@ -413,14 +414,15 @@ impl<'s> ResolvedSchema<'s> {
/// Those schemata would be used to resolve references if needed.
pub fn new_with_known_schemata<'n>(
schemata_to_resolve: Vec<&'s Schema>,
enclosing_namespace: &Namespace,
known_schemata: &'n NamesRef<'n>,
) -> AvroResult<Self> {
let names = HashMap::new();
let mut rs = ResolvedSchema {
names_ref: names,
schemata: schemata_to_resolve,
};
rs.resolve(rs.get_schemata(), &None, Some(known_schemata))?;
rs.resolve(rs.get_schemata(), enclosing_namespace, Some(known_schemata))?;
Ok(rs)
}

Expand Down Expand Up @@ -775,18 +777,19 @@ impl UnionSchema {
note = "Please use `find_schema_with_known_schemata` instead"
)]
pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> {
self.find_schema_with_known_schemata::<Schema>(value, None)
self.find_schema_with_known_schemata::<Schema>(value, None, &None)
}

/// Optionally returns a reference to the schema matched by this value, as well as its position
/// within this union.
///
/// Extra arguments:
/// - `known_schemata` - mapping between `Name` and `Schema` - if passed, additional external schemas would be used to resolve references.
pub fn find_schema_with_known_schemata<S: Borrow<Schema>>(
pub fn find_schema_with_known_schemata<S: Borrow<Schema> + Debug>(
&self,
value: &types::Value,
known_schemata: Option<&HashMap<Name, S>>,
enclosing_namespace: &Namespace,
) -> Option<(usize, &Schema)> {
let schema_kind = SchemaKind::from(value);
if let Some(&i) = self.variant_index.get(&schema_kind) {
Expand All @@ -806,15 +809,19 @@ impl UnionSchema {
.unwrap_or_default();

self.schemas.iter().enumerate().find(|(_, schema)| {
let resolved_schema =
ResolvedSchema::new_with_known_schemata(vec![*schema], &collected_names)
.expect("Schema didn't successfully parse");
let resolved_schema = ResolvedSchema::new_with_known_schemata(
vec![*schema],
enclosing_namespace,
&collected_names,
)
.expect("Schema didn't successfully parse");
let resolved_names = resolved_schema.names_ref;

// extend known schemas with just resolved names
collected_names.extend(resolved_names);
let namespace = &schema.namespace().or_else(|| enclosing_namespace.clone());
value
.validate_internal(schema, &collected_names, &schema.namespace())
.validate_internal(schema, &collected_names, namespace)
.is_none()
})
}
Expand Down
8 changes: 4 additions & 4 deletions lang/rust/avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use serde_json::{Number, Value as JsonValue};
use std::{
collections::{BTreeMap, HashMap},
convert::TryFrom,
fmt::Debug,
hash::BuildHasher,
str::FromStr,
};
Expand Down Expand Up @@ -376,7 +377,7 @@ impl Value {
}
}

pub(crate) fn validate_internal<S: std::borrow::Borrow<Schema>>(
pub(crate) fn validate_internal<S: std::borrow::Borrow<Schema> + Debug>(
&self,
schema: &Schema,
names: &HashMap<Name, S>,
Expand Down Expand Up @@ -476,7 +477,7 @@ impl Value {
.map(|schema| value.validate_internal(schema, names, enclosing_namespace))
.unwrap_or_else(|| Some(format!("No schema in the union at position '{i}'"))),
(v, Schema::Union(inner)) => {
match inner.find_schema_with_known_schemata(v, Some(names)) {
match inner.find_schema_with_known_schemata(v, Some(names), enclosing_namespace) {
Some(_) => None,
None => Some("Could not find matching type in union".to_string()),
}
Expand Down Expand Up @@ -896,9 +897,8 @@ impl Value {
// Reader is a union, but writer is not.
v => v,
};

let (i, inner) = schema
.find_schema_with_known_schemata(&v, Some(names))
.find_schema_with_known_schemata(&v, Some(names), enclosing_namespace)
.ok_or(Error::FindUnionVariant)?;

Ok(Value::Union(
Expand Down
131 changes: 131 additions & 0 deletions lang/rust/avro/tests/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1409,3 +1409,134 @@ fn avro_old_issue_47() -> TestResult {
let _ = to_avro_datum(&schema, to_value(record)?)?;
Ok(())
}

#[test]
fn test_avro_3785_deserialize_namespace_with_nullable_type_containing_reference_type() -> TestResult
{
use apache_avro::{from_avro_datum, to_avro_datum, types::Value};
use serde::{Deserialize, Serialize};

#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct BarUseParent {
#[serde(rename = "barUse")]
pub bar_use: Bar,
}

#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Deserialize, Serialize)]
pub enum Bar {
#[serde(rename = "bar0")]
Bar0,
#[serde(rename = "bar1")]
Bar1,
#[serde(rename = "bar2")]
Bar2,
}

#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct Foo {
#[serde(rename = "barInit")]
pub bar_init: Bar,
#[serde(rename = "barUseParent")]
pub bar_use_parent: Option<BarUseParent>,
}

let writer_schema = r#"{
"type": "record",
"name": "Foo",
"namespace": "name.space",
"fields":
[
{
"name": "barInit",
"type":
{
"type": "enum",
"name": "Bar",
"symbols":
[
"bar0",
"bar1",
"bar2"
]
}
},
{
"name": "barUseParent",
"type": [
"null",
{
"type": "record",
"name": "BarUseParent",
"fields": [
{
"name": "barUse",
"type": "Bar"
}
]
}
]
}
]
}"#;

let reader_schema = r#"{
"type": "record",
"name": "Foo",
"namespace": "name.space",
"fields":
[
{
"name": "barInit",
"type":
{
"type": "enum",
"name": "Bar",
"symbols":
[
"bar0",
"bar1"
]
}
},
{
"name": "barUseParent",
"type": [
"null",
{
"type": "record",
"name": "BarUseParent",
"fields": [
{
"name": "barUse",
"type": "Bar"
}
]
}
]
}
]
}"#;

let writer_schema = Schema::parse_str(writer_schema)?;
let foo1 = Foo {
bar_init: Bar::Bar0,
bar_use_parent: Some(BarUseParent { bar_use: Bar::Bar1 }),
};
let avro_value = crate::to_value(foo1)?;
assert!(
avro_value.validate(&writer_schema),
"value is valid for schema",
);
let datum = to_avro_datum(&writer_schema, avro_value)?;
let mut x = &datum[..];
let reader_schema = Schema::parse_str(reader_schema)?;
let deser_value = from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
match deser_value {
Value::Record(fields) => {
assert_eq!(fields.len(), 2);
}
_ => panic!("Expected Value::Record"),
}

Ok(())
}

0 comments on commit 76d2d5d

Please sign in to comment.