Skip to content

Commit

Permalink
Add optional decoder options to the OCF decoding
Browse files Browse the repository at this point in the history
* Add new public function avro_ocf:decode_binary/2
* Add new public function avro_ocf:decode_file/2
* Extend function interface for avro_binary_decoder:decode_stream/4
  • Loading branch information
Strech committed Nov 10, 2020
1 parent 0f890bc commit a716c41
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 20 deletions.
10 changes: 6 additions & 4 deletions src/avro_binary_decoder.erl
Expand Up @@ -77,11 +77,13 @@ decode_stream(IoData, Type, StoreOrLkupFun) ->
%% bytes in a tuple.
%% @end
-spec decode_stream(iodata(), type_or_name(),
schema_store() | lkup_fun(), hook()) ->
schema_store() | lkup_fun(), hook() | decoder_options()) ->
{avro:out(), binary()}.
decode_stream(IoData, Type, StoreOrLkupFun, Hook) ->
do_decode(IoData, Type, avro_util:ensure_lkup_fun(StoreOrLkupFun),
avro:make_decoder_options([{hook, Hook}])).
decode_stream(IoData, Type, StoreOrLkupFun, Hook) when is_function(Hook) ->
decode_stream(IoData, Type, StoreOrLkupFun,
avro:make_decoder_options([{hook, Hook}]));
decode_stream(IoData, Type, StoreOrLkupFun, Options) when is_map(Options) ->
do_decode(IoData, Type, avro_util:ensure_lkup_fun(StoreOrLkupFun), Options).

%%%_* Internal functions =======================================================

Expand Down
53 changes: 37 additions & 16 deletions src/avro_ocf.erl
Expand Up @@ -26,7 +26,9 @@
-export([ append_file/3
, append_file/5
, decode_binary/1
, decode_binary/2
, decode_file/1
, decode_file/2
, make_header/1
, make_header/2
, make_ocf/2
Expand Down Expand Up @@ -58,15 +60,27 @@

%%%_* APIs =====================================================================

%% @doc Decode ocf into unwrapped values.
%% @doc decode_file/2 equivalent with default decoder options.
-spec decode_file(filename()) -> {header(), avro_type(), [avro:out()]}.
decode_file(Filename) ->
decode_file(Filename, avro:make_decoder_options([])).

%% @doc Decode ocf into unwrapped values.
-spec decode_file(filename(), decoder_options()) ->
{header(), avro_type(), [avro:out()]}.
decode_file(Filename, Options) ->
{ok, Bin} = file:read_file(Filename),
decode_binary(Bin).
decode_binary(Bin, Options).

%% @doc Decode ocf binary into unwrapped values.
%% @doc decode_binary/2 equivalent with default decoder options.
-spec decode_binary(binary()) -> {header(), avro_type(), [avro:out()]}.
decode_binary(Bin) ->
decode_binary(Bin, avro:make_decoder_options([])).

%% @doc Decode ocf binary into unwrapped values.
-spec decode_binary(binary(), decoder_options()) ->
{header(), avro_type(), [avro:out()]}.
decode_binary(Bin, Options) ->
{[ {<<"magic">>, Magic}
, {<<"meta">>, Meta}
, {<<"sync">>, Sync}
Expand All @@ -80,7 +94,7 @@ decode_binary(Bin) ->
, meta = Meta
, sync = Sync
},
{Header, Schema, decode_blocks(Lkup, Schema, Codec, Sync, Tail, [])}.
{Header, Schema, decode_blocks(Lkup, Schema, Codec, Sync, Tail, [], Options)}.

%% @doc Write objects in a single block to the given file name.
-spec write_file(filename(), lkup(), type_or_name(), [avro:in()]) -> ok.
Expand Down Expand Up @@ -214,27 +228,34 @@ decode_stream(Type, Bin) when is_binary(Bin) ->
decode_stream(Lkup, Type, Bin) when is_binary(Bin) ->
avro_binary_decoder:decode_stream(Bin, Type, Lkup).

-spec decode_stream(lkup(), avro_type(), binary(), decoder_options()) ->
{avro:out(), binary()} | no_return().
decode_stream(Lkup, Type, Bin, Options) when is_binary(Bin) ->
avro_binary_decoder:decode_stream(Bin, Type, Lkup, Options).

-spec decode_blocks(lkup(), avro_type(), avro_codec(),
binary(), binary(), [avro:out()]) -> [avro:out()].
decode_blocks(_Lkup, _Type, _Codec, _Sync, <<>>, Acc) ->
binary(), binary(), [avro:out()], decoder_options()) ->
[avro:out()].
decode_blocks(_Lkup, _Type, _Codec, _Sync, <<>>, Acc, _Options) ->
lists:reverse(Acc);
decode_blocks(Lkup, Type, Codec, Sync, Bin0, Acc) ->
decode_blocks(Lkup, Type, Codec, Sync, Bin0, Acc, Options) ->
LongType = avro_primitive:long_type(),
{Count, Bin1} = decode_stream(Lkup, LongType, Bin0),
{Size, Bin} = decode_stream(Lkup, LongType, Bin1),
<<Block:Size/binary, Sync:16/binary, Tail/binary>> = Bin,
NewAcc = decode_block(Lkup, Type, Codec, Block, Count, Acc),
decode_blocks(Lkup, Type, Codec, Sync, Tail, NewAcc).
NewAcc = decode_block(Lkup, Type, Codec, Block, Count, Acc, Options),
decode_blocks(Lkup, Type, Codec, Sync, Tail, NewAcc, Options).

-spec decode_block(lkup(), avro_type(), avro_codec(),
binary(), integer(), [avro:out()]) -> [avro:out()].
decode_block(_Lkup, _Type, _Codec, <<>>, 0, Acc) -> Acc;
decode_block(Lkup, Type, deflate, Bin, Count, Acc) ->
binary(), integer(), [avro:out()],
decoder_options()) -> [avro:out()].
decode_block(_Lkup, _Type, _Codec, <<>>, 0, Acc, _Options) -> Acc;
decode_block(Lkup, Type, deflate, Bin, Count, Acc, Options) ->
Decompressed = zlib:unzip(Bin),
decode_block(Lkup, Type, null, Decompressed, Count, Acc);
decode_block(Lkup, Type, null, Bin, Count, Acc) ->
{Obj, Tail} = decode_stream(Lkup, Type, Bin),
decode_block(Lkup, Type, null, Tail, Count - 1, [Obj | Acc]).
decode_block(Lkup, Type, null, Decompressed, Count, Acc, Options);
decode_block(Lkup, Type, null, Bin, Count, Acc, Options) ->
{Obj, Tail} = decode_stream(Lkup, Type, Bin, Options),
decode_block(Lkup, Type, null, Tail, Count - 1, [Obj | Acc], Options).

%% Hand coded schema.
%% {"type": "record", "name": "org.apache.avro.file.Header",
Expand Down
20 changes: 20 additions & 0 deletions test/avro_ocf_tests.erl
Expand Up @@ -126,6 +126,26 @@ make_ocf_test() ->
{_, _, DecodedL} = avro_ocf:decode_binary(Bin),
?assertEqual(L, DecodedL).

decoder_hook_test() ->
Fields = [ avro_record:define_field("f1", int, [])
, avro_record:define_field("f2", null, [])
],
Type = avro_record:type("rec", Fields, [{namespace, "my.ocf.test"}]),
Header = avro_ocf:make_header(Type),
Encoder = avro:make_simple_encoder(Type, []),
Object = [{"f1", 1}, {"f2", null}],
Objects = [Encoder(Object)],
Bin = iolist_to_binary(avro_ocf:make_ocf(Header, Objects)),
Hook = fun(Typ, _, Data, DecodeFun) ->
case avro:get_type_name(Typ) of
<<"null">> -> {<<"modifiedNull">>, Data};
_ -> DecodeFun(Data)
end
end,
Options = avro:make_decoder_options([{hook, Hook}]),
{_, _, Objs} = avro_ocf:decode_binary(Bin, Options),
?assertEqual([[{<<"f1">>, 1}, {<<"f2">>, <<"modifiedNull">>}]], Objs).

test_data(FileName) ->
filename:join([code:lib_dir(erlavro, test), "data", FileName]).

Expand Down

0 comments on commit a716c41

Please sign in to comment.