Skip to content

Commit

Permalink
Merge branch 'master' into benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
peburrows committed Sep 30, 2016
2 parents edf9aea + f937903 commit b052212
Show file tree
Hide file tree
Showing 24 changed files with 294 additions and 200 deletions.
25 changes: 14 additions & 11 deletions lib/river/client.ex
@@ -1,21 +1,24 @@
defmodule River.Client do
def get(uri, timeout \\ 5000)
def get(%URI{}=uri, timeout) do
alias River.{Request}

def get(uri, timeout \\ 5_000)
def get(%URI{} = uri, timeout) do
{:ok, conn} = River.Conn.create(uri.host)
IO.puts "we are getting: #{uri.host} :: #{inspect conn}"
River.Conn.get(conn, uri.path, timeout)
River.Conn.request!(conn, %Request{uri: uri}, timeout)
end
def get(url, timeout), do: get(URI.parse(url), timeout)
def get(uri, timeout), do: get(URI.parse(uri), timeout)

def post(%URI{}=uri, data) do
def put(uri, data, timeout \\ 5_000)
def put(%URI{} = uri, data, timeout) do
{:ok, conn} = River.Conn.create(uri.host)
River.Conn.post(conn, uri.path, data)
River.Conn.request!(conn, %Request{uri: uri, data: data, method: :put}, timeout)
end
def post(url, data), do: post(URI.parse(url), data)
def put(uri, data, timeout), do: put(URI.parse(uri), data, timeout)

def put(%URI{}=uri, data) do
def post(uri, data, timeout \\ 5_000)
def post(%URI{} = uri, data, timeout) do
{:ok, conn} = River.Conn.create(uri.host)
River.Conn.put(conn, uri.path, data)
River.Conn.request!(%Request{method: :post, uri: uri, data: data}, timeout)
end
def put(url, data), do: put(URI.parse(url), data)
def post(uri, data, timeout), do: post(URI.parse(uri), data, timeout)
end
219 changes: 140 additions & 79 deletions lib/river/conn.ex
Expand Up @@ -4,15 +4,18 @@ defmodule River.Conn do
use River.FrameTypes
use Bitwise
alias Experimental.DynamicSupervisor
alias River.{Conn, Frame, Frame.Settings, Encoder}
alias River.{Conn, Frame, Frame.Settings, Frame.WindowUpdate, Encoder, Request}

@default_header_table_size 4096
@initial_window_size 65_535

defstruct [
host: nil,
protocol: "h2",
send_ctx: nil,
recv_ctx: nil,
send_window: 0,
recv_window: @initial_window_size,
buffer: "",
socket: nil,
stream_id: -1,
Expand All @@ -39,144 +42,202 @@ defmodule River.Conn do
end
end

def init(%Conn{host: host}=conn) do
def init(%Conn{} = conn) do
{:ok, send_ctx} = HPack.Table.start_link(@default_header_table_size)
{:ok, recv_ctx} = HPack.Table.start_link(@default_header_table_size)

conn = %{conn |
send_ctx: send_ctx,
recv_ctx: recv_ctx,
settings: [
MAX_CONCURRENT_STREAMS: 100,
INITIAL_WINDOW_SIZE: 65535,
HEADER_TABLE_SIZE: @default_header_table_size,
],
}
send_ctx: send_ctx,
recv_ctx: recv_ctx,
settings: [
MAX_CONCURRENT_STREAMS: 250,
INITIAL_WINDOW_SIZE: @initial_window_size,
HEADER_TABLE_SIZE: @default_header_table_size,
# MAX_FRAME_SIZE: @max_frame_size,
],
}

{:connect, :init, conn}
end

def get(pid, path, timeout \\ 5_000) do
Connection.cast(pid, {:get, path, self})
def request!(pid, %Request{}=req, timeout) do
Connection.cast(pid, {req, self})
listen(timeout)
end

defp listen(timeout) do
receive do
{:ok, response} ->
{:ok, response}
{:data} ->
listen(timeout)
other ->
other
after timeout -> # eventually, we need to customize the timeout
{:error, :timeout}
after timeout ->
{:error, :timeout}
end
end

def connect(info, %Conn{host: host}=conn) do

def connect(_info, %Conn{host: host} = conn) do
host = String.to_charlist(host)

case :ssl.connect(host, 443, ssl_options(host)) do
{:ok, socket} ->
River.Frame.http2_header
:ssl.send(socket, River.Frame.http2_header)

# frame = River.Frame.Settings.encode(conn.settings, 0)
frame = Encoder.encode(%Frame{
type: @settings,
payload: %Settings{
settings: conn.settings
}})
frame = %Frame{
type: @settings,
payload: %Settings{settings: conn.settings}
}
encoded_frame = Encoder.encode(frame)

:ssl.send(socket, frame)
:ssl.send(socket, encoded_frame)
{:ok, %{conn | socket: socket}}
{:error, _} = error ->
{:error, _} ->
{:backoff, 1000, conn}
other ->
_other ->
{:backoff, 1000, conn}
end
end

def disconnect(info, %Conn{socket: socket}=conn) do
def disconnect(_info, %Conn{socket: socket} = conn) do
# we need to disconnect from the ssl socket
:ssl.close(socket)
{:stop, :exit, conn}
end

def handle_cast({:get, path}, conn), do: handle_cast({:get, path, nil}, conn)
def handle_cast({:get, path, parent}, conn) do
# the problem here might be that this call will block until it fires
# off the request, which is less than ideal. What we should do here is
# spin up a RequestHandler of some sort to trigger it and handle things
%{
socket: socket,
host: host,
stream_id: stream_id,
send_ctx: ctx,
streams: streams
} = conn

stream_id = stream_id + 2
def handle_cast({%Request{}=req, parent}, conn) do
make_request(req, parent, conn)
end

{:ok, _} = DynamicSupervisor.start_child(River.StreamSupervisor, [[name: :"stream-#{host}-#{stream_id}"], parent])
defp make_request(%Request{method: method, uri: %{path: path}}=req, parent,
%{host: host, stream_id: stream_id, socket: socket, send_ctx: ctx, streams: streams}=conn) do

:ssl.setopts(socket, [active: true])

conn =
conn
|> add_stream(parent)
|> send_headers(req)
|> send_data(req)

{:noreply, conn}
end

defp add_stream(%{stream_id: id, streams: count, host: host}=conn, parent) do
id = id + 2
{:ok, _} = DynamicSupervisor.start_child(River.StreamSupervisor, [[name: :"stream-#{host}-#{id}"], parent])

%{conn | stream_id: id, streams: count + 1}
end

defp send_headers(%{send_ctx: ctx, socket: socket, host: host, stream_id: id} = conn,
%{headers: headers, method: method, uri: %{path: path}} = req) do
headers = [
{":method", "GET"},
{":scheme", "https"},
{":path", path},
{":method", (method |> Atom.to_string |> String.upcase)},
{":scheme", "https"},
{":path", path},
# this should probably be req.authority instead
{":authority", host},
{"accept", "*/*"},
{"user-agent", "River/0.0.1"}
]
{"accept", "*/*"},
{"user-agent", "River/0.0.1"},
] ++ headers

frame = %Frame{
type: @headers,
stream_id: id,
flags: header_flags(req),
payload: %Frame.Headers{headers: headers}
} |> Encoder.encode(ctx)

:ssl.send(socket, frame)
conn
end

f = Encoder.encode(%Frame{
type: @headers,
stream_id: stream_id,
flags: %{end_headers: true, end_stream: true},
payload: %Frame.Headers{
headers: headers
}}, ctx)
defp header_flags(%{method: :get}), do: %{end_headers: true, end_stream: true}
defp header_flags(_), do: %{end_headers: true}

# IO.puts "#{IO.ANSI.green_background}#{Base.encode16(f, case: :lower)}#{IO.ANSI.reset}"
defp send_data(conn, %{method: :get}), do: conn
defp send_data(%{stream_id: stream_id, socket: socket} = conn, %{data: data}) do
frame = %Frame{
type: @data,
stream_id: stream_id,
flags: %{end_stream: true},
payload: %Frame.Data{data: data}
} |> Encoder.encode

:ssl.send(socket, f)
{:noreply, %{conn | stream_id: stream_id, streams: streams+1 } }
:ssl.send(socket, frame)
conn
end

def handle_info({:ssl, _, payload} = msg, conn) do
%{
recv_ctx: ctx,
socket: socket,
buffer: prev,
host: host,
} = conn

def handle_info({:ssl, _what, payload}, %{recv_ctx: ctx, buffer: buffer} = conn) do
conn = decode_frames(conn, buffer <> payload, ctx, [])
{:noreply, conn}
end

{new_conn, frames} = decode_frames(conn, prev <> payload, ctx, [])
def handle_info(_message, conn) do
{:noreply, conn}
end

for f <- frames do
{:ok, pid} = DynamicSupervisor.start_child(River.StreamSupervisor, [[name: :"stream-#{host}-#{f.stream_id}"]])
defp handle_frame(conn, %{type: @settings, flags: %{ack: false}} = frame) do
f = Encoder.encode(%Frame{
type: @settings,
stream_id: 0,
flags: %{ack: true},
payload: %Settings{
settings: []
}})
:ssl.send(conn.socket, f)
%{conn | settings: conn.settings ++ frame.payload.settings}
end

River.StreamHandler.add_frame(pid, f)
defp handle_frame(%{recv_window: window} = conn, %{type: @data, length: len, stream_id: stream}) do
window = window - len
IO.puts "the window: #{inspect window}"

if window <= 0 do
frame1 = %Frame{
type: @window_update,
stream_id: stream,
payload: %WindowUpdate{
# increment: @initial_window_size + 200_000
increment: 2_000_000
}}

# IO.puts "sending window update frame #{inspect frame1} :: #{inspect Encoder.encode(frame1)}"
:ssl.send(conn.socket, Encoder.encode(frame1))
:ssl.send(conn.socket, Encoder.encode(%{frame1 | stream_id: 0}))
%{conn | recv_window: 2_000_000 }
else
IO.puts "we still have room on the window: #{inspect window}"
%{conn | recv_window: window}
end

{:noreply, new_conn}
end

defp decode_frames(conn, <<>>, _ctx, stack),
do: {conn, Enum.reverse(stack)}
defp handle_frame(conn, %{flags: %{end_stream: true}}) do
%{conn | streams: conn.streams - 1}
end

defp handle_frame(conn, _frame), do: conn

defp decode_frames(conn, <<>>, _ctx, _stack),
do: %{conn | buffer: <<>>}

defp decode_frames(conn, payload, ctx, stack) do
case Frame.decode(payload, ctx) do
{:ok, frame, more} ->
# IO.puts "frame! :: #{inspect frame.length} :: #{inspect frame.flags}"
{:ok, pid} = DynamicSupervisor.start_child(River.StreamSupervisor, [[name: :"stream-#{conn.host}-#{frame.stream_id}"]])
River.StreamHandler.add_frame(pid, frame)
conn = handle_frame(conn, frame)
decode_frames(conn, more, ctx, [frame | stack])
{:error, :invalid_frame} ->
{ %{conn | buffer: payload}, Enum.reverse(stack) }
{:error, :invalid_frame, buffer} ->
%{conn | buffer: buffer}
end
end

def handle_info(msg, conn) do
IO.puts "unhandled message: #{inspect msg}"
{:noreply, conn}
end

defp ssl_options(host) do
verify_fun = {(&:ssl_verify_hostname.verify_fun/3), [{:check_hostname, host}]}

Expand Down

0 comments on commit b052212

Please sign in to comment.