Skip to content

Commit

Permalink
feat: rewrite keepalive_ready feature based on reader_state table
Browse files Browse the repository at this point in the history
  • Loading branch information
GuyLewin committed Apr 20, 2022
1 parent 91935c4 commit dece7fa
Showing 1 changed file with 27 additions and 38 deletions.
65 changes: 27 additions & 38 deletions lib/resty/http.lua
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ function _M.new(_)
return nil, err
end
return setmetatable({
sock = sock, keepalive_supported = true, keepalive_ready = false
sock = sock, keepalive_supported = true, reader_state = { keepalive_ready = false, mark_keepalive_ready_on_body_read = true }
}, mt)
end

Expand Down Expand Up @@ -211,7 +211,7 @@ function _M.set_keepalive(self, ...)
end

if self.keepalive_supported == true then
if not self.keepalive_ready then
if not self.reader_state.keepalive_ready then
return nil, "response not fully read"
end

Expand Down Expand Up @@ -435,18 +435,7 @@ end
_M.transfer_encoding_is_chunked = transfer_encoding_is_chunked


local function _reader_keepalive_ready_mark(http_client)
return function()
http_client.keepalive_ready = true
end
end

local function _reader_keepalive_ready_no_op()
return function() end
end


local function _chunked_body_reader(keepalive_ready_callback, sock, default_chunk_size)
local function _chunked_body_reader(reader_state, sock, default_chunk_size)
return co_wrap(function(max_chunk_size)
local remaining = 0
local length
Expand Down Expand Up @@ -505,12 +494,14 @@ local function _chunked_body_reader(keepalive_ready_callback, sock, default_chun

until length == 0

keepalive_ready_callback()
if reader_state.mark_keepalive_ready_on_body_read then
reader_state.keepalive_ready = true
end
end)
end


local function _body_reader(keepalive_ready_callback, sock, content_length, default_chunk_size)
local function _body_reader(reader_state, sock, content_length, default_chunk_size)
return co_wrap(function(max_chunk_size)
max_chunk_size = max_chunk_size or default_chunk_size

Expand Down Expand Up @@ -540,8 +531,9 @@ local function _body_reader(keepalive_ready_callback, sock, content_length, defa
elseif not max_chunk_size then
-- We have a length and potentially keep-alive, but want everything.
co_yield(sock:receive(content_length))
keepalive_ready_callback()

if reader_state.mark_keepalive_ready_on_body_read then
reader_state.keepalive_ready = true
end
else
-- We have a length and potentially a keep-alive, and wish to stream
-- the response.
Expand Down Expand Up @@ -569,7 +561,9 @@ local function _body_reader(keepalive_ready_callback, sock, content_length, defa
end

until length == 0
keepalive_ready_callback()
if reader_state.mark_keepalive_ready_on_body_read then
reader_state.keepalive_ready = true
end
end
end)
end
Expand Down Expand Up @@ -608,10 +602,11 @@ local function _read_body(res)
end


local function _trailer_reader(keepalive_ready_callback, sock)
local function _trailer_reader(reader_state, sock)
return co_wrap(function()
co_yield(_receive_headers(sock))
keepalive_ready_callback()
-- We can always pool after reading trailers
reader_state.keepalive_ready = true
end)
end

Expand Down Expand Up @@ -677,7 +672,7 @@ function _M.send_request(self, params)
setmetatable(params, { __index = DEFAULT_PARAMS })

-- Sending a new request makes keepalive disabled until its response is fully read
self.keepalive_ready = false
self.reader_state.keepalive_ready = false

local sock = self.sock
local body = params.body
Expand Down Expand Up @@ -830,23 +825,16 @@ function _M.read_response(self, params)
local trailer_reader
local has_body = false
local has_trailer = false
local body_reader_keepalive_ready_callback

if res_headers["Trailer"] then
has_trailer = true
-- If there are trailers - fully reading response body doesn't mean socket is ready to be pooled
body_reader_keepalive_ready_callback = _reader_keepalive_ready_no_op()
else
-- If there are no trailers - fully reading response body means socket is ready to be pooled
body_reader_keepalive_ready_callback = _reader_keepalive_ready_mark(self)
end
has_trailer = (res_headers["Trailer"] ~= nil)
self.reader_state.mark_keepalive_ready_on_body_read = not has_trailer

-- Receive the body_reader
if _should_receive_body(params.method, status) then
has_body = true

if version == 1.1 and transfer_encoding_is_chunked(res_headers) then
body_reader, err = _chunked_body_reader(body_reader_keepalive_ready_callback, sock)
body_reader, err = _chunked_body_reader(self.reader_state, sock)
else
local length
ok, length = pcall(tonumber, res_headers["Content-Length"])
Expand All @@ -855,17 +843,17 @@ function _M.read_response(self, params)
length = nil
end

body_reader, err = _body_reader(body_reader_keepalive_ready_callback, sock, length)
body_reader, err = _body_reader(self.reader_state, sock, length)
end
else
if not has_trailer then
-- If there's no body and no trailer - it's ready for keep-alive
self.keepalive_ready = true
self.reader_state.keepalive_ready = true
end
end

if has_trailer then
trailer_reader, err = _trailer_reader(_reader_keepalive_ready_mark(self), sock)
trailer_reader, err = _trailer_reader(self.reader_state, sock)
end

if err then
Expand Down Expand Up @@ -1024,14 +1012,15 @@ function _M.get_client_body_reader(_, chunksize, sock)
end
end

local reader_keep_alive_ready_callback = _reader_keepalive_ready_no_op()
-- Reading the request body has nothing to do with pooling the upstream server socket
local request_body_reader_state = { mark_keepalive_ready_on_body_read = false }
local headers = ngx_req_get_headers()
local length = headers.content_length
if length then
return _body_reader(reader_keep_alive_ready_callback, sock, tonumber(length), chunksize)
return _body_reader(request_body_reader_state, sock, tonumber(length), chunksize)
elseif transfer_encoding_is_chunked(headers) then
-- Not yet supported by ngx_lua but should just work...
return _chunked_body_reader(reader_keep_alive_ready_callback, sock, chunksize)
return _chunked_body_reader(request_body_reader_state, sock, chunksize)
else
return nil
end
Expand Down

0 comments on commit dece7fa

Please sign in to comment.