From 3ad17bfead45ec215f10f7c7fbde58ce13c49299 Mon Sep 17 00:00:00 2001 From: Yann Kerherve Date: Sat, 5 Jun 2010 19:51:06 -0700 Subject: [PATCH] Adds decode_record test and skip() for most types Still working on schema resolution --- lib/Avro/BinaryDecoder.pm | 114 ++++++++++++++++++++++++++++++++++++-- lib/Avro/BinaryEncoder.pm | 4 +- lib/Avro/Schema.pm | 10 ++++ t/03_bin_decode.t | 49 +++++++++++++++- 4 files changed, 166 insertions(+), 11 deletions(-) diff --git a/lib/Avro/BinaryDecoder.pm b/lib/Avro/BinaryDecoder.pm index f507b26..4f89ff8 100644 --- a/lib/Avro/BinaryDecoder.pm +++ b/lib/Avro/BinaryDecoder.pm @@ -55,11 +55,20 @@ sub decode { return $class->$meth($writer_schema, $reader_schema, $reader); } +sub skip { + my $class = shift; + my ($schema, $reader) = @_; + my $type = ref $schema ? $schema->type : $schema; + my $meth = "skip_$type"; + return $class->$meth($schema, $reader); +} + sub resolve_schema { } sub decode_null { undef } +sub skip_boolean { &decode_boolean } sub decode_boolean { my $class = shift; my $reader = pop; @@ -67,17 +76,20 @@ sub decode_boolean { return $bool ? 1 : 0; } +sub skip_int { &decode_int } sub decode_int { my $class = shift; my $reader = pop; return zigzag(unsigned_varint($reader)); } +sub skip_long { &decode_long }; sub decode_long { my $class = shift; return decode_int($class, @_); } +sub skip_float { &decode_float } sub decode_float { my $class = shift; my $reader = pop; @@ -85,6 +97,7 @@ sub decode_float { return unpack "f<", $buf; } +sub skip_double { &decode_double } sub decode_double { my $class = shift; my $reader = pop; @@ -92,6 +105,14 @@ sub decode_double { return pack "d<", $buf, } +sub skip_bytes { + my $class = shift; + my $reader = pop; + my $size = decode_long($class, undef, undef, $reader); + $reader->seek($size, 0); + return; +} + sub decode_bytes { my $class = shift; my $reader = pop; @@ -100,6 +121,7 @@ sub decode_bytes { return $buf; } +sub skip_string { &skip_bytes } sub decode_string { my $class = shift; my $reader = pop; @@ -107,6 +129,14 @@ sub decode_string { return Encode::decode_utf8($bytes); } +sub skip_record { + my $class = shift; + my ($schema, $reader) = @_; + for my $field (@{ $schema->fields }){ + skip($class, $field->{type}, $reader); + } +} + ## 1.3.2 A record is encoded by encoding the values of its fields in the order ## that they are declared. In other words, a record is encoded as just the ## concatenation of the encodings of its fields. Field values are encoded per @@ -115,20 +145,45 @@ sub decode_record { my $class = shift; my ($writer_schema, $reader_schema, $reader) = @_; my $record; + + my %extra_fields = %{ $reader_schema->fields_as_hash }; for my $field (@{ $writer_schema->fields }) { - ## TODO: schema resolution - my $field_schema = $field->{type}; + my $name = $field->{name}; + my $w_field_schema = $field->{type}; + my $r_field_schema = delete $extra_fields{$name}; + + ## 1.3.2 if the writer's record contains a field with a name not + ## present in the reader's record, the writer's value for that field + ## is ignored. + if (! $r_field_schema) { + $class->skip($w_field_schema, $reader); + next; + } my $data = $class->decode( - writer_schema => $field_schema, - reader_schema => $field_schema, + writer_schema => $w_field_schema, + reader_schema => $r_field_schema, reader => $reader, ); - $record->{ $field->{name} } = $data; + $record->{ $name } = $data; + } + + for my $name (keys %extra_fields) { + ## 1.3.2. if the reader's record schema has a field with no default + ## value, and writer's schema does not have a field with the same + ## name, an error is signalled. + unless (exists $extra_fields{$name}->{default}) { + throw Avro::Schema::Error::DataMismatch( + "cannot resolve without default" + ); + } + ## 1.3.2 ... else the default value is used + $record->{ $name } = $extra_fields{$name}->{default}; } - ## TODO: default values. (grep) return $record; } +sub skip_enum { &skip_int } + ## 1.3.2 An enum is encoded by a int, representing the zero-based position of ## the symbol in the schema. sub decode_enum { @@ -144,6 +199,30 @@ sub decode_enum { return $w_data; } +sub skip_block { + my $class = shift; + my ($reader, $block_content) = @_; + my $block_count = decode_long($class, undef, undef, $reader); + while ($block_count) { + if ($block_count < 0) { + $reader->seek($block_count, 0); + next; + } + else { + for (1..$block_count) { + $block_content->(); + } + } + $block_count = decode_long($class, undef, undef, $reader); + } +} + +sub skip_array { + my $class = shift; + my ($schema, $reader) = @_; + skip_block($reader, sub { $class->skip($schema->items, $reader) }); +} + ## 1.3.2 Arrays are encoded as a series of blocks. Each block consists of a ## long count value, followed by that many array items. A block with count zero ## indicates the end of the array. Each item is encoded per the array's item @@ -174,6 +253,14 @@ sub decode_array { return \@array; } +sub skip_map { + my $class = shift; + my ($schema, $reader) = @_; + skip_block($reader, sub { + skip_string($class, $reader); + $class->skip($schema->values, $reader); + }); +} ## 1.3.2 Maps are encoded as a series of blocks. Each block consists of a long ## count value, followed by that many key/value pairs. A block with count zero @@ -210,6 +297,15 @@ sub decode_map { return \%hash; } +sub skip_union { + my $class = shift; + my ($schema, $reader) = @_; + my $idx = decode_long($class, undef, undef, $reader); + my $union_schema = $schema->schemas->[$idx] + or throw Avro::Schema::Error::Parse("union union member"); + $class->skip($union_schema, $reader); +} + ## 1.3.2 A union is encoded by first writing a long value indicating the ## zero-based position within the union of the schema of its value. The value ## is then encoded per the indicated schema within the union. @@ -226,6 +322,12 @@ sub decode_union { ); } +sub skip_fixed { + my $class = shift; + my ($schema, $reader) = @_; + $reader->seek($schema->size, 0); +} + ## 1.3.2 Fixed instances are encoded using the number of bytes declared in the ## schema. sub decode_fixed { diff --git a/lib/Avro/BinaryEncoder.pm b/lib/Avro/BinaryEncoder.pm index dc825bf..ac9bf11 100644 --- a/lib/Avro/BinaryEncoder.pm +++ b/lib/Avro/BinaryEncoder.pm @@ -124,8 +124,8 @@ sub encode_record { my ($schema, $data, $cb) = @_; for my $field (@{ $schema->fields }) { $class->encode( - schema => $field->{type}, - data => $data->{ $field->{name} }, + schema => $field->{type}, + data => $data->{ $field->{name} }, emit_cb => $cb, ); } diff --git a/lib/Avro/Schema.pm b/lib/Avro/Schema.pm index 377ec6f..15a6c24 100644 --- a/lib/Avro/Schema.pm +++ b/lib/Avro/Schema.pm @@ -480,6 +480,16 @@ sub fields { return $schema->{fields}; } +sub fields_as_hash { + my $schema = shift; + unless (exists $schema->{_fields_as_hash}) { + $schema->{_fields_as_hash} = { + map { $_->{name} => $_ } @{ $schema->{fields} } + }; + } + return $schema->{_fields_as_hash}; +} + package Avro::Schema::Enum; our @ISA = qw/Avro::Schema::Named/; diff --git a/t/03_bin_decode.t b/t/03_bin_decode.t index 773d204..55d863f 100644 --- a/t/03_bin_decode.t +++ b/t/03_bin_decode.t @@ -4,7 +4,7 @@ use strict; use warnings; use Avro::Schema; use Avro::BinaryEncoder; -use Test::More tests => 12; +use Test::More tests => 16; use Test::Exception; use IO::String; @@ -62,13 +62,13 @@ EOJ $reader = IO::String->new("\x00\x02\x61"); $dec = Avro::BinaryDecoder->decode( writer_schema => $schema, - reader_schema => $schema, + reader_schema => $schema, reader => $reader, ); is $dec, "a", "Binary_Encodings.Complex_Types.Unions-a"; } -## schema resolution +## enum schema resolution { my $w_enum = Avro::Schema->parse(<parse(<parse(< 1, bonus => "i" }; + my $enc = ''; + Avro::BinaryEncoder->encode( + schema => $w_schema, + data => $data, + emit_cb => sub { $enc .= ${ $_[0] } }, + ); + my $dec = Avro::BinaryDecoder->decode( + writer_schema => $w_schema, + reader_schema => $r_schema, + reader => IO::String->new($enc), + ); + is $dec->{a}, 1, "easy"; + ok ! exists $dec->{bonus}, "bonus extra field ignored"; + is $dec->{t}, 37.5, "default t from reader used"; + + ## delete the default for t + delete $r_schema->fields->[0]{default}; + throws_ok { + Avro::BinaryDecoder->decode( + writer_schema => $w_schema, + reader_schema => $r_schema, + reader => IO::String->new($enc), + ); + } "Avro::Schema::Error::DataMismatch", "no default value!"; +} + done_testing;