Skip to content

Commit

Permalink
Adds decode_record test and skip() for most types
Browse files Browse the repository at this point in the history
Still working on schema resolution
  • Loading branch information
yannk committed Jun 6, 2010
1 parent 1ecd337 commit 3ad17bf
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 11 deletions.
114 changes: 108 additions & 6 deletions lib/Avro/BinaryDecoder.pm
Expand Up @@ -55,43 +55,64 @@ sub decode {
return $class->$meth($writer_schema, $reader_schema, $reader);
}

sub skip {
my $class = shift;
my ($schema, $reader) = @_;
my $type = ref $schema ? $schema->type : $schema;
my $meth = "skip_$type";
return $class->$meth($schema, $reader);
}

sub resolve_schema {
}

sub decode_null { undef }

sub skip_boolean { &decode_boolean }
sub decode_boolean {
my $class = shift;
my $reader = pop;
$reader->read(my $bool, 1);
return $bool ? 1 : 0;
}

sub skip_int { &decode_int }
sub decode_int {
my $class = shift;
my $reader = pop;
return zigzag(unsigned_varint($reader));
}

sub skip_long { &decode_long };
sub decode_long {
my $class = shift;
return decode_int($class, @_);
}

sub skip_float { &decode_float }
sub decode_float {
my $class = shift;
my $reader = pop;
$reader->read(my $buf, 4);
return unpack "f<", $buf;
}

sub skip_double { &decode_double }
sub decode_double {
my $class = shift;
my $reader = pop;
$reader->read(my $buf, 8);
return pack "d<", $buf,
}

sub skip_bytes {
my $class = shift;
my $reader = pop;
my $size = decode_long($class, undef, undef, $reader);
$reader->seek($size, 0);
return;
}

sub decode_bytes {
my $class = shift;
my $reader = pop;
Expand All @@ -100,13 +121,22 @@ sub decode_bytes {
return $buf;
}

sub skip_string { &skip_bytes }
sub decode_string {
my $class = shift;
my $reader = pop;
my $bytes = decode_bytes($class, undef, undef, $reader);
return Encode::decode_utf8($bytes);
}

sub skip_record {
my $class = shift;
my ($schema, $reader) = @_;
for my $field (@{ $schema->fields }){
skip($class, $field->{type}, $reader);
}
}

## 1.3.2 A record is encoded by encoding the values of its fields in the order
## that they are declared. In other words, a record is encoded as just the
## concatenation of the encodings of its fields. Field values are encoded per
Expand All @@ -115,20 +145,45 @@ sub decode_record {
my $class = shift;
my ($writer_schema, $reader_schema, $reader) = @_;
my $record;

my %extra_fields = %{ $reader_schema->fields_as_hash };
for my $field (@{ $writer_schema->fields }) {
## TODO: schema resolution
my $field_schema = $field->{type};
my $name = $field->{name};
my $w_field_schema = $field->{type};
my $r_field_schema = delete $extra_fields{$name};

## 1.3.2 if the writer's record contains a field with a name not
## present in the reader's record, the writer's value for that field
## is ignored.
if (! $r_field_schema) {
$class->skip($w_field_schema, $reader);
next;
}
my $data = $class->decode(
writer_schema => $field_schema,
reader_schema => $field_schema,
writer_schema => $w_field_schema,
reader_schema => $r_field_schema,
reader => $reader,
);
$record->{ $field->{name} } = $data;
$record->{ $name } = $data;
}

for my $name (keys %extra_fields) {
## 1.3.2. if the reader's record schema has a field with no default
## value, and writer's schema does not have a field with the same
## name, an error is signalled.
unless (exists $extra_fields{$name}->{default}) {
throw Avro::Schema::Error::DataMismatch(
"cannot resolve without default"
);
}
## 1.3.2 ... else the default value is used
$record->{ $name } = $extra_fields{$name}->{default};
}
## TODO: default values. (grep)
return $record;
}

sub skip_enum { &skip_int }

## 1.3.2 An enum is encoded by a int, representing the zero-based position of
## the symbol in the schema.
sub decode_enum {
Expand All @@ -144,6 +199,30 @@ sub decode_enum {
return $w_data;
}

sub skip_block {
my $class = shift;
my ($reader, $block_content) = @_;
my $block_count = decode_long($class, undef, undef, $reader);
while ($block_count) {
if ($block_count < 0) {
$reader->seek($block_count, 0);
next;
}
else {
for (1..$block_count) {
$block_content->();
}
}
$block_count = decode_long($class, undef, undef, $reader);
}
}

sub skip_array {
my $class = shift;
my ($schema, $reader) = @_;
skip_block($reader, sub { $class->skip($schema->items, $reader) });
}

## 1.3.2 Arrays are encoded as a series of blocks. Each block consists of a
## long count value, followed by that many array items. A block with count zero
## indicates the end of the array. Each item is encoded per the array's item
Expand Down Expand Up @@ -174,6 +253,14 @@ sub decode_array {
return \@array;
}

sub skip_map {
my $class = shift;
my ($schema, $reader) = @_;
skip_block($reader, sub {
skip_string($class, $reader);
$class->skip($schema->values, $reader);
});
}

## 1.3.2 Maps are encoded as a series of blocks. Each block consists of a long
## count value, followed by that many key/value pairs. A block with count zero
Expand Down Expand Up @@ -210,6 +297,15 @@ sub decode_map {
return \%hash;
}

sub skip_union {
my $class = shift;
my ($schema, $reader) = @_;
my $idx = decode_long($class, undef, undef, $reader);
my $union_schema = $schema->schemas->[$idx]
or throw Avro::Schema::Error::Parse("union union member");
$class->skip($union_schema, $reader);
}

## 1.3.2 A union is encoded by first writing a long value indicating the
## zero-based position within the union of the schema of its value. The value
## is then encoded per the indicated schema within the union.
Expand All @@ -226,6 +322,12 @@ sub decode_union {
);
}

sub skip_fixed {
my $class = shift;
my ($schema, $reader) = @_;
$reader->seek($schema->size, 0);
}

## 1.3.2 Fixed instances are encoded using the number of bytes declared in the
## schema.
sub decode_fixed {
Expand Down
4 changes: 2 additions & 2 deletions lib/Avro/BinaryEncoder.pm
Expand Up @@ -124,8 +124,8 @@ sub encode_record {
my ($schema, $data, $cb) = @_;
for my $field (@{ $schema->fields }) {
$class->encode(
schema => $field->{type},
data => $data->{ $field->{name} },
schema => $field->{type},
data => $data->{ $field->{name} },
emit_cb => $cb,
);
}
Expand Down
10 changes: 10 additions & 0 deletions lib/Avro/Schema.pm
Expand Up @@ -480,6 +480,16 @@ sub fields {
return $schema->{fields};
}

sub fields_as_hash {
my $schema = shift;
unless (exists $schema->{_fields_as_hash}) {
$schema->{_fields_as_hash} = {
map { $_->{name} => $_ } @{ $schema->{fields} }
};
}
return $schema->{_fields_as_hash};
}

package Avro::Schema::Enum;
our @ISA = qw/Avro::Schema::Named/;

Expand Down
49 changes: 46 additions & 3 deletions t/03_bin_decode.t
Expand Up @@ -4,7 +4,7 @@ use strict;
use warnings;
use Avro::Schema;
use Avro::BinaryEncoder;
use Test::More tests => 12;
use Test::More tests => 16;
use Test::Exception;
use IO::String;

Expand Down Expand Up @@ -62,13 +62,13 @@ EOJ
$reader = IO::String->new("\x00\x02\x61");
$dec = Avro::BinaryDecoder->decode(
writer_schema => $schema,
reader_schema => $schema,
reader_schema => $schema,
reader => $reader,
);
is $dec, "a", "Binary_Encodings.Complex_Types.Unions-a";
}

## schema resolution
## enum schema resolution
{

my $w_enum = Avro::Schema->parse(<<EOP);
Expand Down Expand Up @@ -107,4 +107,47 @@ EOP
}
}

## record resolution
{
my $w_schema = Avro::Schema->parse(<<EOJ);
{ "type": "record", "name": "test",
"fields" : [
{"name": "a", "type": "long"},
{"name": "bonus", "type": "string"} ]}
EOJ

my $r_schema = Avro::Schema->parse(<<EOJ);
{ "type": "record", "name": "test",
"fields" : [
{"name": "t", "type": "float", "default": 37.5 },
{"name": "a", "type": "long"} ]}
EOJ

my $data = { a => 1, bonus => "i" };
my $enc = '';
Avro::BinaryEncoder->encode(
schema => $w_schema,
data => $data,
emit_cb => sub { $enc .= ${ $_[0] } },
);
my $dec = Avro::BinaryDecoder->decode(
writer_schema => $w_schema,
reader_schema => $r_schema,
reader => IO::String->new($enc),
);
is $dec->{a}, 1, "easy";
ok ! exists $dec->{bonus}, "bonus extra field ignored";
is $dec->{t}, 37.5, "default t from reader used";

## delete the default for t
delete $r_schema->fields->[0]{default};
throws_ok {
Avro::BinaryDecoder->decode(
writer_schema => $w_schema,
reader_schema => $r_schema,
reader => IO::String->new($enc),
);
} "Avro::Schema::Error::DataMismatch", "no default value!";
}

done_testing;

0 comments on commit 3ad17bf

Please sign in to comment.