Skip to content

Commit

Permalink
Add server side metrics
Browse files Browse the repository at this point in the history
We could use server-side metrics for Thrift services. Standard metrics for
every endpoint like QPS, latency, request/response size, and success rate.

In order to avoid coupling to our internal metrics library, I left the
interface pluggable. This also facilitates testing by substituting a stub
implementation in tests.
  • Loading branch information
pguillory committed Nov 6, 2020
1 parent b03a91b commit c8ffeb3
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 107 deletions.
314 changes: 208 additions & 106 deletions lib/thrift/binary/framed/protocol_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@ defmodule Thrift.Binary.Framed.ProtocolHandler do
@type transport_opts :: :ranch_tcp.opts()

alias Thrift.{
Protocol,
Protocol.Binary,
TApplicationException,
Transport.SSL
}

require Logger

defstruct [
:transport,
:socket,
:server_module,
:handler_module,
:recv_timeout
]

@spec start_link(reference, port, transport, {module, module, transport_opts, [SSL.option()]}) ::
GenServer.on_start()
def start_link(
Expand All @@ -43,162 +51,256 @@ defmodule Thrift.Binary.Framed.ProtocolHandler do
@dialyzer {:nowarn_function, init: 7}
@spec init(reference, port, :ranch_tcp, module, module, :ranch_tcp.opts(), [SSL.option()]) ::
:ok | no_return
def init(ref, socket, :ranch_tcp = transport, server_module, handler_module, tcp_opts, ssl_opts) do
def init(ref, socket, :ranch_tcp, server_module, handler_module, tcp_opts, ssl_opts) do
:ok = :ranch.accept_ack(ref)

{recv_timeout, tcp_opts} = Keyword.pop(tcp_opts, :recv_timeout, @default_timeout)
:ok = :inet.setopts(socket, Keyword.put(tcp_opts, :packet, 4))

with {:ok, first_bytes} <- :gen_tcp.recv(socket, 4, recv_timeout),
:ok <- :gen_tcp.unrecv(socket, first_bytes) do
<<first_byte::8-unsigned, _::binary>> = first_bytes
state = %__MODULE__{
transport: :gen_tcp,
socket: socket,
server_module: server_module,
handler_module: handler_module,
recv_timeout: recv_timeout
}

transport_options = Keyword.put(tcp_opts, :packet, 4)
transport.setopts(socket, transport_options)
case SSL.configuration(ssl_opts) do
{:error, %_exception{} = err} ->
Logger.error(fn ->
format_log("SSL configuration error", Exception.format(:error, err, []), state)
end)

maybe_ssl_handshake(
socket,
first_byte,
ssl_opts,
server_module,
handler_module,
recv_timeout
)
else
{:error, closed} when closed in [:closed, :econnreset, :timeout] ->
:ok = transport.close(socket)
close(state)

{:error, reason} ->
# :ssl.format_error handles posix errors as well as ssl errors
Logger.info(fn ->
"#{inspect(handler_module)} (#{inspect(self())}) connection error: #{
:ssl.format_error(reason)
} (#{inspect(reason)})"
end)
nil ->
receive_message(state)

:ok = transport.close(socket)
{optional, ssl_opts} when optional in [:required, :optional] ->
peek_first_byte(optional, ssl_opts, state)
end
end

defp maybe_ssl_handshake(socket, first_byte, ssl_opts, server_module, handler_module, timeout) do
with {optional, ssl_opts} when optional in [:required, :optional] <-
SSL.configuration(ssl_opts),
{:ok, transport, socket} <-
maybe_ssl_accept(socket, first_byte, optional, ssl_opts, timeout) do
do_thrift_call({transport, socket, server_module, handler_module, timeout})
else
nil ->
do_thrift_call({:gen_tcp, socket, server_module, handler_module, timeout})
defp peek_first_byte(
optional,
ssl_opts,
%__MODULE__{transport: :gen_tcp, socket: socket, recv_timeout: recv_timeout} = state
) do
span = start_span(:peek_first_byte, state)

{:error, %_exception{} = err} ->
with :ok <- :inet.setopts(socket, packet: :raw),
{:ok, first_bytes} <- :gen_tcp.recv(socket, 4, recv_timeout),
:ok <- :gen_tcp.unrecv(socket, first_bytes),
:ok <- :inet.setopts(socket, packet: 4) do
<<first_byte::8-unsigned, _::binary>> = first_bytes
{:ok, first_byte}
end
|> case do
{:ok, @ssl_header_byte} ->
finish_span(span, result: "ssl")
ssl_handshake(ssl_opts, state)

{:ok, _non_ssl_header_byte} when optional == :required ->
Logger.error(fn ->
"#{inspect(handler_module)} (#{inspect(self())}) configuration error: " <>
Exception.format(:error, err, [])
format_log("peek_first_byte", "SSL required", state)
end)

{:error, reason} ->
Logger.info(fn ->
"#{inspect(handler_module)} (#{inspect(self())}) handshake error: #{
:ssl.format_error(reason)
} (#{inspect(reason)})"
end)
end
end
finish_span(span, result: "tcp_rejected")
close(state)

defp maybe_ssl_accept(socket, @ssl_header_byte, _optional, ssl_opts, timeout) do
case ssl_handshake(socket, ssl_opts, timeout) do
{:ok, ssl_sock} ->
{:ok, :ssl, ssl_sock}
{:ok, _non_ssl_header_byte} when optional == :optional ->
finish_span(span, result: "tcp")
receive_message(state)

error ->
error
end
end
{:error, closed} when closed in [:closed, :econnreset, :timeout] ->
finish_span(span, result: to_string(closed))
close(state)

defp maybe_ssl_accept(socket, _first_byte, :optional, _ssl_opts, _timeout) do
{:ok, :gen_tcp, socket}
end
{:error, error} ->
Logger.error(fn ->
format_log("peek_first_byte", :ssl.format_error(error), state)
end)

defp maybe_ssl_accept(_socket, _first_byte, :required, _ssl_opts, _timeout) do
{:error, :closed}
finish_span(span, result: "error")
close(state)
end
end

defp ssl_handshake(socket, ssl_opts, timeout) do
defp ssl_handshake(
ssl_opts,
%__MODULE__{transport: :gen_tcp, socket: socket, recv_timeout: recv_timeout} = state
) do
span = start_span(:ssl_handshake, state)

# As of OTP 21.0, `:ssl.ssl_accept/3` is deprecated in favour of `:ssl.handshake/3`.
# This check allows us to support both, depending on which OTP version is being used.
if function_exported?(:ssl, :handshake, 3) do
apply(:ssl, :handshake, [socket, ssl_opts, timeout])
apply(:ssl, :handshake, [socket, ssl_opts, recv_timeout])
else
apply(:ssl, :ssl_accept, [socket, ssl_opts, timeout])
apply(:ssl, :ssl_accept, [socket, ssl_opts, recv_timeout])
end
end
|> case do
{:ok, ssl_socket} ->
finish_span(span, result: "success")
state = %__MODULE__{state | transport: :ssl, socket: ssl_socket}
receive_message(state)

defp do_thrift_call({transport, socket, server_module, handler_module, recv_timeout} = args) do
with {:ok, message} <- transport.recv(socket, 0, recv_timeout),
parsed <- Protocol.Binary.deserialize(:message_begin, message),
{:ok, :reply, data} <- handle_thrift_message(parsed, server_module, handler_module),
:ok <- transport.send(socket, data) do
do_thrift_call(args)
else
{:error, {:server_error, thrift_data}} ->
:ok = transport.send(socket, thrift_data)
exit({:shutdown, :server_error})
{:error, closed} when closed in [:closed, :econnreset, :timeout] ->
finish_span(span, result: to_string(closed))
close(state)

{:error, {:protocol_error, reason}} ->
Logger.warn(fn ->
"#{inspect(handler_module)} (#{inspect(self())}) decode error: #{inspect(reason)}"
{:error, error} ->
Logger.error(fn ->
format_log("ssl_handshake", :ssl.format_error(error), state)
end)

:ok = transport.close(socket)
finish_span(span, result: "error")
close(state)
end
end

defp receive_message(
%__MODULE__{transport: transport, socket: socket, recv_timeout: recv_timeout} = state
) do
span = start_span(:receive_message, state)

case transport.recv(socket, 0, recv_timeout) do
{:ok, serialized_message} ->
finish_span(span, result: "success")
deserialize_message(serialized_message, state)

{:error, closed} when closed in [:closed, :econnreset, :timeout] ->
:ok = transport.close(socket)
finish_span(span, result: to_string(closed))
close(state)

{:error, error} ->
Logger.error(fn ->
format_log("receive_message", :ssl.format_error(error), state)
end)

finish_span(span, result: "error")
close(state)
end
end

defp deserialize_message(serialized_message, state) do
case Binary.deserialize(:message_begin, serialized_message) do
{:ok, message} ->
handle_message(message, state)

{:error, reason} ->
# :ssl.format_error handles posix error as well as ssl
Logger.info(fn ->
"#{inspect(handler_module)} (#{inspect(self())}) connection error: #{
:ssl.format_error(reason)
} (#{inspect(reason)})"
Logger.error(fn ->
format_log("deserialize_message", inspect(reason), state)
end)

:ok = transport.close(socket)
close(state)
end
end

defp handle_thrift_message(
{:ok, {:call, sequence_id, name, args_binary}},
server_module,
handler_module
defp handle_message(
{:call, sequence_id, method_name, args_binary},
%__MODULE__{server_module: server_module, handler_module: handler_module} = state
) do
case server_module.handle_thrift(name, args_binary, handler_module) do
{:reply, serialized_reply} ->
message = Protocol.Binary.serialize(:message_begin, {:reply, sequence_id, name})
request_size = IO.iodata_length(args_binary)
gauge(:request_size, request_size, state, method: method_name)

span = start_span(:call, state, method: method_name)

case server_module.handle_thrift(method_name, args_binary, handler_module) do
:noreply -> {:reply, <<0>>}
other -> other
end
|> case do
{:reply, serialized_response} ->
finish_span(span, result: "success")

serialized_message = Binary.serialize(:message_begin, {:reply, sequence_id, method_name})

{:ok, :reply, [message | serialized_reply]}
response_size = IO.iodata_length(serialized_message)
gauge(:response_size, response_size, state, method: method_name, result: "success")

send_reply([serialized_message | serialized_response], :continue, state)

{:server_error, %TApplicationException{} = exc} ->
message = Protocol.Binary.serialize(:message_begin, {:exception, sequence_id, name})
serialized_exception = Protocol.Binary.serialize(:application_exception, exc)
finish_span(span, result: "error")

serialized_message = Binary.serialize(:message_begin, {:exception, sequence_id, method_name})

{:error, {:server_error, [message | serialized_exception]}}
serialized_exception = Binary.serialize(:application_exception, exc)

:noreply ->
message = Protocol.Binary.serialize(:message_begin, {:reply, sequence_id, name})
response_size = IO.iodata_length(serialized_exception)
gauge(:response_size, response_size, state, method: method_name, result: "error")

{:ok, :reply, [message | <<0>>]}
send_reply([serialized_message | serialized_exception], :stop, state)
end
end

defp handle_thrift_message(
{:ok, {:oneway, _seq_id, name, args_binary}},
server_module,
handler_module
defp handle_message(
{:oneway, _sequence_id, method_name, args_binary},
%__MODULE__{server_module: server_module, handler_module: handler_module} = state
) do
spawn(server_module, :handle_thrift, [name, args_binary, handler_module])
{:ok, :reply, <<0>>}
spawn(fn ->
server_module.handle_thrift(method_name, args_binary, handler_module)
end)

send_reply(<<0>>, :continue, state)
end

defp send_reply(data, after_reply, %__MODULE__{transport: transport, socket: socket} = state) do
span = start_span(:send_reply, state)

case transport.send(socket, data) do
:ok when after_reply == :continue ->
finish_span(span, result: "success")
receive_message(state)

:ok when after_reply == :stop ->
finish_span(span, result: "success")
close(state)

{:error, closed} when closed in [:closed, :econnreset, :timeout] ->
finish_span(span, result: to_string(closed))
close(state)

{:error, error} ->
Logger.error(fn ->
format_log("send_reply", :ssl.format_error(error), state)
end)

finish_span(span, result: "error")
close(state)
end
end

defp close(%__MODULE__{transport: transport, socket: socket}) do
transport.close(socket)
end

defp start_span(metric_name, %__MODULE__{handler_module: handler_module}, tags \\ %{}) do
event_name = [Thrift, handler_module, metric_name]
time = System.monotonic_time()
metadata = Map.new(tags)
{:span, event_name, metadata, time}
end

defp finish_span({:span, event_name, metadata, time}, tags) do
duration = System.monotonic_time() - time
:telemetry.execute(event_name, %{duration: duration}, add_tags(metadata, tags))
end

defp gauge(metric_name, value, %__MODULE__{handler_module: handler_module}, tags)
when is_number(value) do
event_name = [Thrift, handler_module, metric_name]
:telemetry.execute(event_name, %{value: value}, Map.new(tags))
end

defp add_tags(metadata, tags) do
Enum.reduce(tags, metadata, fn
{k, v}, metadata -> Map.put(metadata, k, v)
end)
end

defp handle_thrift_message({:error, msg}, _, _) do
{:error, {:protocol_error, msg}}
defp format_log(context, message, %__MODULE__{handler_module: handler_module}) do
"#{inspect(handler_module)} (#{inspect(self())}) #{context}: #{message}"
end
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ defmodule Thrift.Mixfile do

# Runtime
{:connection, "~> 1.0"},
{:ranch, "~> 1.6"}
{:ranch, "~> 1.6"},
{:telemetry, "~> 0.3"}
]
end

Expand Down

0 comments on commit c8ffeb3

Please sign in to comment.