Skip to content

Commit

Permalink
Improve streaming parsing (#23)
Browse files Browse the repository at this point in the history
This PR changes some internal behaviors of the streaming parser.

* Simplify context function in buffering - no more buffering inside parser, instead the parser returns the current parsing context.

* Remove "Saxy.Parser", which is unnecessarily abstracted.

* Enhance test suite coverage around streaming parsing.

* Improve Saxy.Buffering module.

* Support Enumerable in streaming parsing.
  • Loading branch information
qcam authored Aug 5, 2018
1 parent 0fd3225 commit e8f9347
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 359 deletions.
44 changes: 38 additions & 6 deletions lib/saxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ defmodule Saxy do
initial_state :: term(),
options :: Keyword.t()
) :: {:ok, state :: term()} | {:error, exception :: Saxy.ParseError.t()}
def parse_string(data, handler, initial_state, options \\ []) when is_binary(data) and is_atom(handler) do
def parse_string(data, handler, initial_state, options \\ [])
when is_binary(data) and is_atom(handler) do
expand_entity = Keyword.get(options, :expand_entity, :keep)

state = %State{
Expand All @@ -163,7 +164,13 @@ defmodule Saxy do
expand_entity: expand_entity
}

Parser.parse_document(data, :done, state)
case Parser.Prolog.parse_prolog(data, :done, data, 0, state) do
{:ok, state} ->
{:ok, state.user_state}

{:error, _reason} = error ->
error
end
end

@doc ~S"""
Expand Down Expand Up @@ -223,13 +230,13 @@ defmodule Saxy do
"""

@spec parse_stream(
stream :: File.Stream.t() | Stream.t(),
stream :: Enumerable.t(),
handler :: module() | function(),
initial_state :: term(),
options :: Keyword.t()
) :: {:ok, state :: term()} | {:error, exception :: Saxy.ParseError.t()}

def parse_stream(%module{} = stream, handler, initial_state, options \\ []) when module in [File.Stream, Stream] do
def parse_stream(stream, handler, initial_state, options \\ []) do
expand_entity = Keyword.get(options, :expand_entity, :keep)

state = %State{
Expand All @@ -239,7 +246,31 @@ defmodule Saxy do
expand_entity: expand_entity
}

Parser.parse_document(<<>>, stream, state)
init = Parser.Prolog.parse_prolog(<<>>, :buffering, <<>>, 0, state)

stream
|> Enum.reduce_while(init, &stream_reducer/2)
|> case do
{:halted, rest, original, context_fun} ->
case context_fun.(rest, :done, original) do
{:ok, state} -> {:ok, state.user_state}
{:error, reason} -> {:error, reason}
end

other ->
other
end
end

defp stream_reducer(next_bytes, {:halted, rest, original, context_fun}) do
rest = rest <> next_bytes
original = original <> next_bytes

{:cont, context_fun.(rest, :buffering, original)}
end

defp stream_reducer(_next_bytes, {:error, _reason} = error) do
{:halt, error}
end

@doc """
Expand Down Expand Up @@ -285,7 +316,8 @@ defmodule Saxy do
]
"""
@spec encode_to_iodata!(root :: Saxy.XML.element(), prolog :: Saxy.Prolog.t() | Keyword.t()) :: iodata()
@spec encode_to_iodata!(root :: Saxy.XML.element(), prolog :: Saxy.Prolog.t() | Keyword.t()) ::
iodata()

def encode_to_iodata!(root, prolog \\ []) do
Encoder.encode_to_iodata(root, prolog)
Expand Down
111 changes: 30 additions & 81 deletions lib/saxy/buffering.ex
Original file line number Diff line number Diff line change
@@ -1,95 +1,44 @@
defmodule Saxy.Buffering do
@moduledoc false

defmacro buffering_parse_fun(fun_name, arity, token \\ "")
defmacro buffering_parse_fun(fun_name, arity, token) do
quoted_params =
defmacro defhalt(fun_name, arity, token) do
params_splice =
case arity do
5 -> quote(do: [cont, original, pos, state])
6 -> quote(do: [cont, original, pos, state, acc1])
7 -> quote(do: [cont, original, pos, state, acc1, acc2])
8 -> quote(do: [cont, original, pos, state, acc1, acc2, acc3])
9 -> quote(do: [cont, original, pos, state, acc1, acc2, acc3, acc4])
10 -> quote(do: [cont, original, pos, state, acc1, acc2, acc3, acc4, acc5])
5 -> quote(do: [original, pos, state])
6 -> quote(do: [original, pos, state, acc1])
7 -> quote(do: [original, pos, state, acc1, acc2])
8 -> quote(do: [original, pos, state, acc1, acc2, acc3])
9 -> quote(do: [original, pos, state, acc1, acc2, acc3, acc4])
10 -> quote(do: [original, pos, state, acc1, acc2, acc3, acc4, acc5])
end
quoted_fun =

context_fun =
case arity do
5 -> quote(do: &(unquote(fun_name)(&1, &2, &3, &4, &5)))
6 -> quote(do: &(unquote(fun_name)(&1, &2, &3, &4, &5, acc1)))
7 -> quote(do: &(unquote(fun_name)(&1, &2, &3, &4, &5, acc1, acc2)))
8 -> quote(do: &(unquote(fun_name)(&1, &2, &3, &4, &5, acc1, acc2, acc3)))
9 -> quote(do: &(unquote(fun_name)(&1, &2, &3, &4, &5, acc1, acc2, acc3, acc4)))
10 -> quote(do: &(unquote(fun_name)(&1, &2, &3, &4, &5, acc1, acc2, acc3, acc4, acc5)))
5 -> quote(do: &unquote(fun_name)(&1, &2, &3, pos, state))
6 -> quote(do: &unquote(fun_name)(&1, &2, &3, pos, state, acc1))
7 -> quote(do: &unquote(fun_name)(&1, &2, &3, pos, state, acc1, acc2))
8 -> quote(do: &unquote(fun_name)(&1, &2, &3, pos, state, acc1, acc2, acc3))
9 -> quote(do: &unquote(fun_name)(&1, &2, &3, pos, state, acc1, acc2, acc3, acc4))
10 -> quote(do: &unquote(fun_name)(&1, &2, &3, pos, state, acc1, acc2, acc3, acc4, acc5))
end

if token == :utf8 do
quote do
# 2-byte/3-byte/4-byte unicode
def unquote(fun_name)(<<1::size(1), rest::size(7)>>, unquote_splicing(quoted_params))
when cont != :done do
Saxy.Buffering.maybe_buffer(<<1::size(1), rest::size(7)>>, cont, original, pos, state, unquote(quoted_fun))
end

# 3-byte/4-byte unicode
def unquote(fun_name)(<<1::size(1), 1::size(1), rest::6-bits, next_char::1-bytes>>, unquote_splicing(quoted_params))
when cont != :done do
Saxy.Buffering.maybe_buffer(<<1::size(1), 1::size(1), rest::6-bits, next_char::binary>>, cont, original, pos, state, unquote(quoted_fun))
end

# # 4-byte unicode
def unquote(fun_name)(<<1::size(1), 1::size(1), 1::size(1), rest::5-bits, next_char::2-bytes>>, unquote_splicing(quoted_params))
when cont != :done do
Saxy.Buffering.maybe_buffer(<<1::size(1), 1::size(1), 1::size(1), rest::5-bits, next_char::binary>>, cont, original, pos, state, unquote(quoted_fun))
end
quote do
def unquote(fun_name)(unquote(token), :buffering, unquote_splicing(params_splice)) do
{
:halted,
unquote(token),
original,
unquote(context_fun)
}
end
else
quote do
def unquote(fun_name)(unquote(token), unquote_splicing(quoted_params))
when cont != :done do
Saxy.Buffering.maybe_buffer(unquote(token), cont, original, pos, state, unquote(quoted_fun))
end
end
end
end

@compile {:inline, [maybe_buffer: 6]}

def maybe_buffer(<<buffer::bits>>, cont, original, pos, state, fun) do
case do_buffer(cont) do
:done ->
fun.(buffer, :done, original, pos, state)

{:ok, {cont_bytes, next_cont}} ->
buffer = [buffer | cont_bytes] |> IO.iodata_to_binary()
original = [original | cont_bytes] |> IO.iodata_to_binary()
fun.(buffer, next_cont, original, pos, state)
end
end

@compile {:inline, [maybe_commit: 2]}

def maybe_commit(buffer, pos) do
buffer_size = byte_size(buffer)

binary_part(buffer, pos, buffer_size - pos)
end

defp do_buffer(cont) do
case next_cont(cont) do
{:suspended, next_bytes, reducer} ->
next_cont = fn _, _ -> reducer.({:cont, :first}) end
{:ok, {next_bytes, next_cont}}

{:done, _} -> :done

{:halted, _} -> :done
end
end

defp next_cont(cont) do
Enumerable.reduce(cont, {:cont, :first}, fn
next_bytes, :first -> {:suspend, next_bytes}
next_bytes, _ -> {:cont, next_bytes}
end)
def utf8_binaries() do
[
quote(do: <<1::size(1), rest::size(7)>>),
quote(do: <<1::size(1), 1::size(1), rest::size(6), next_char::1-bytes>>),
quote(do: <<1::size(1), 1::size(1), 1::size(1), rest::size(5), next_chars::2-bytes>>)
]
end
end
15 changes: 0 additions & 15 deletions lib/saxy/parser.ex

This file was deleted.

Loading

0 comments on commit e8f9347

Please sign in to comment.