Skip to content

Commit

Permalink
Merge: libevent & nitcorn: close connections on error and implement c…
Browse files Browse the repository at this point in the history
…allbacks in Nit

This PR moves the implementation of libevent callbacks from C to Nit to enable specialization and refinements of the callbacks, it should help for #1803. Also the event callback closes the connection on the Nit side on errors so servers do not attempt to write on broken connections, it should fix the current crash on http://xymus.net/ with Tnitter clients.

Also update the only user of `read_callback_native` for the new API. And seize this opportunity to rename the module, replace class refinement with specialization, clean up the doc and optimize a bit how it handle strings.

Pull-Request: #1836
Reviewed-by: Jean Privat <jean@pryen.org>
  • Loading branch information
privat committed Nov 18, 2015
2 parents de696da + af3118d commit 8dbd72d
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 76 deletions.
174 changes: 131 additions & 43 deletions lib/libevent.nit
Original file line number Diff line number Diff line change
Expand Up @@ -42,44 +42,21 @@ in "C" `{
// Callback forwarded to 'Connection.read_callback_native'
static void c_read_cb(struct bufferevent *bev, Connection ctx)
{
// TODO move to Nit code
struct evbuffer *input = bufferevent_get_input(bev);
size_t len = evbuffer_get_length(input);
char* cstr = malloc(len);
evbuffer_remove(input, cstr, len);
Connection_read_callback_native(ctx, cstr, len);
Connection_read_callback_native(ctx, bev);
}
// Callback forwarded to 'Connection.event_callback'
static void c_event_cb(struct bufferevent *bev, short events, Connection ctx)
{
Connection_event_callback(ctx, events);
// TODO move to Nit code
if (events & BEV_EVENT_ERROR)
perror("Error from bufferevent");
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
bufferevent_free(bev);
Connection_decr_ref(ctx);
}
int release = Connection_event_callback(ctx, events);
if (release) Connection_decr_ref(ctx);
}
// Callback fowarded to 'ConnectionFactory.spawn_connection'
static void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t fd,
// Callback forwarded to 'ConnectionFactory.accept_connection'
static void accept_connection_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *address, int socklen, ConnectionFactory ctx)
{
// TODO move to Nit code
struct event_base *base = evconnlistener_get_base(listener);
struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
Connection nit_con = ConnectionFactory_spawn_connection(ctx, bev);
Connection_incr_ref(nit_con);
bufferevent_setcb(bev,
(bufferevent_data_cb)c_read_cb,
(bufferevent_data_cb)c_write_cb,
(bufferevent_event_cb)c_event_cb, nit_con);
bufferevent_enable(bev, EV_READ|EV_WRITE);
ConnectionFactory_accept_connection(ctx, listener, fd, address, socklen);
}
`}

Expand Down Expand Up @@ -138,38 +115,65 @@ class Connection
# Callback method on a write event
fun write_callback
do
if close_requested and not closed then close
if close_requested then close
end

private fun read_callback_native(cstr: NativeString, len: Int)
private fun read_callback_native(bev: NativeBufferEvent)
do
read_callback(cstr.to_s_with_length(len))
var evbuffer = bev.input_buffer
var len = evbuffer.length
var buf = new NativeString(len)
evbuffer.remove(buf, len)
var str = buf.to_s_with_length(len)
read_callback str
end

# Callback method when data is available to read
fun read_callback(content: String)
do
if close_requested and not closed then close
if close_requested then close
end

# Callback method on events
fun event_callback(events: Int) do end
# Callback method on events: EOF, user-defined timeout and unrecoverable errors
#
# Returns `true` if the native handles to `self` can be released.
fun event_callback(events: Int): Bool
do
if events & bev_event_error != 0 or events & bev_event_eof != 0 then
if events & bev_event_error != 0 then print_error "Error from bufferevent"
close
return true
end

return false
end

# Write a string to the connection
redef fun write(str)
do
if close_requested then return
native_buffer_event.write(str.to_cstring, str.bytelen)
end

redef fun write_byte(byte) do native_buffer_event.write_byte(byte)
redef fun write_byte(byte)
do
if close_requested then return
native_buffer_event.write_byte(byte)
end

redef fun write_bytes(bytes) do native_buffer_event.write(bytes.items, bytes.length)
redef fun write_bytes(bytes)
do
if close_requested then return
native_buffer_event.write(bytes.items, bytes.length)
end

# Write a file to the connection
#
# If `not path.file_exists`, the method returns.
fun write_file(path: String)
do
if close_requested then return

var file = new FileReader.open(path)
if file.last_error != null then
var error = new IOError("Failed to open file at '{path}'")
Expand All @@ -194,8 +198,76 @@ class Connection
end
end

# ---
# Error code for event callbacks

# error encountered while reading
fun bev_event_reading: Int `{ return BEV_EVENT_READING; `}

# error encountered while writing
fun bev_event_writing: Int `{ return BEV_EVENT_WRITING; `}

# eof file reached
fun bev_event_eof: Int `{ return BEV_EVENT_EOF; `}

# unrecoverable error encountered
fun bev_event_error: Int `{ return BEV_EVENT_ERROR; `}

# user-specified timeout reached
fun bev_event_timeout: Int `{ return BEV_EVENT_TIMEOUT; `}

# connect operation finished.
fun bev_event_connected: Int `{ return BEV_EVENT_CONNECTED; `}

# ---
# Options that can be specified when creating a `NativeBufferEvent`

# Close the underlying file descriptor/bufferevent/whatever when this bufferevent is freed.
fun bev_opt_close_on_free: Int `{ return BEV_OPT_CLOSE_ON_FREE; `}

# If threading is enabled, protect the operations on this bufferevent with a lock.
fun bev_opt_threadsafe: Int `{ return BEV_OPT_THREADSAFE; `}

# Run callbacks deferred in the event loop.
fun bev_opt_defer_callbacks: Int `{ return BEV_OPT_DEFER_CALLBACKS; `}

# If set, callbacks are executed without locks being held on the bufferevent.
fun bev_opt_unlock_callbacks: Int `{ return BEV_OPT_UNLOCK_CALLBACKS; `}

# ---
# Options for `NativeBufferEvent::enable`

# Read operation
fun ev_read: Int `{ return EV_READ; `}

# Write operation
fun ev_write: Int `{ return EV_WRITE; `}

# ---

# A buffer event structure, strongly associated to a connection, an input buffer and an output_buffer
extern class NativeBufferEvent `{ struct bufferevent * `}

# Socket-based `NativeBufferEvent` that reads and writes data onto a network
new socket(base: NativeEventBase, fd, options: Int) `{
return bufferevent_socket_new(base, fd, options);
`}

# Enable a bufferevent.
fun enable(operation: Int) `{
bufferevent_enable(self, operation);
`}

# Set callbacks to `read_callback_native`, `write_callback` and `event_callback` of `conn`
fun setcb(conn: Connection) import Connection.read_callback_native,
Connection.write_callback, Connection.event_callback, NativeString `{
Connection_incr_ref(conn);
bufferevent_setcb(self,
(bufferevent_data_cb)c_read_cb,
(bufferevent_data_cb)c_write_cb,
(bufferevent_event_cb)c_event_cb, conn);
`}

# Write `length` bytes of `line`
fun write(line: NativeString, length: Int): Int `{
return bufferevent_write(self, line, length);
Expand Down Expand Up @@ -237,6 +309,11 @@ end
extern class NativeEvBuffer `{ struct evbuffer * `}
# Length of data in this buffer
fun length: Int `{ return evbuffer_get_length(self); `}

# Read data from an evbuffer and drain the bytes read
fun remove(buffer: NativeString, len: Int) `{
evbuffer_remove(self, buffer, len);
`}
end

# An input buffer
Expand All @@ -261,8 +338,7 @@ end
extern class ConnectionListener `{ struct evconnlistener * `}

private new bind_to(base: NativeEventBase, address: NativeString, port: Int, factory: ConnectionFactory)
import ConnectionFactory.spawn_connection, error_callback, Connection.read_callback_native,
Connection.write_callback, Connection.event_callback `{
import ConnectionFactory.accept_connection, error_callback `{
struct sockaddr_in sin;
struct evconnlistener *listener;
Expand All @@ -276,7 +352,7 @@ extern class ConnectionListener `{ struct evconnlistener * `}
memcpy( &(sin.sin_addr.s_addr), (const void*)hostent->h_addr, hostent->h_length );
listener = evconnlistener_new_bind(base,
(evconnlistener_cb)accept_conn_cb, factory,
(evconnlistener_cb)accept_connection_cb, factory,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
(struct sockaddr*)&sin, sizeof(sin));
Expand Down Expand Up @@ -309,10 +385,22 @@ class ConnectionFactory
# The `NativeEventBase` for the dispatch loop of this factory
var event_base: NativeEventBase

# On new connection, create the handler `Connection` object
fun spawn_connection(nat_buf_ev: NativeBufferEvent): Connection
# Accept a connection on `listener`
#
# By default, it creates a new NativeBufferEvent and calls `spawn_connection`.
fun accept_connection(listener: ConnectionListener, fd: Int, address: Pointer, socklen: Int)
do
var base = listener.base
var bev = new NativeBufferEvent.socket(base, fd, bev_opt_close_on_free)
var conn = spawn_connection(bev)
bev.enable ev_read|ev_write
bev.setcb conn
end

# Create a new `Connection` object for `buffer_event`
fun spawn_connection(buffer_event: NativeBufferEvent): Connection
do
return new Connection(nat_buf_ev)
return new Connection(buffer_event)
end

# Listen on `address`:`port` for new connection, which will callback `spawn_connection`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,49 +12,52 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Http request parsing for bufferized inputs.
module http_request_parser
# Http request parsing for buffered inputs.
module http_request_buffer

intrude import libevent

redef class Connection
# Connection rebuilding HTTP requests
#
# Subclass should refine `read_full_request` and avoid `read_callback`.
class HTTPConnection
super Connection

private var in_request = false
private var in_header = false
private var in_body = false
private var current_header: FlatBuffer
private var current_body: FlatBuffer
private var current_header = new Array[Writable]
private var current_body = new Array[Writable]
private var content_length = 0
private var current_length = 0

# FIXME will not work if the header/body delimiter fall between two watermarks windows.
redef fun read_callback_native(cstr, len)
# FIXME will not work if the header/body delimiter falls between two watermarks windows.
redef fun read_callback(str)
do
# is this the start of a request?
if not in_request then
parse_start
end
var str = cstr.to_s_with_length(len)
if not in_request then parse_start

var body: String
# parsing header
if in_header then
body = parse_header(str)
else
body = str
end

# parsing body
if in_body then
parse_body(body)
end
if in_body then parse_body(body)
end

# Callback when a full HTTP request is received
fun read_http_request(str: String) do end

# We have a new request entering
# Prepare for a new request
private fun parse_start do
in_request = true
# reset values
current_header = new FlatBuffer
current_body = new FlatBuffer
current_header.clear
current_body.clear
current_length = 0
content_length = 0
# next step is to find the header part
Expand All @@ -64,17 +67,17 @@ redef class Connection

# We are receiving the header of a request
#
# Return parsed body foud in header window
# Return parsed body found in header window
private fun parse_header(str: String): String do
# split in CRLF
var parts = str.split("\r\n\r\n")
# first part go in the header
current_header.append parts.shift
current_header.add parts.shift

# if there is more part we are done with headers
if not parts.is_empty then
# get content-length
parse_content_length(current_header.write_to_string)
parse_content_length current_header.join
# next step if to parse body
in_header = false
in_body = true
Expand All @@ -98,19 +101,19 @@ redef class Connection
# We are receiving body parts.
private fun parse_body(str: String) do
current_length += str.length
current_body.append str
current_body.add str
if current_length >= content_length then
parse_end
end
end

# We have reached the end of the body
private fun parse_end do
var res = new FlatBuffer
res.append current_header
var res = new FlatBuffer.with_capacity(content_length)
for ch in current_header do res.append ch.write_to_string
res.append "\r\n\r\n"
res.append current_body
read_callback(res.write_to_string)
for cb in current_body do res.append cb.write_to_string
read_http_request res.to_s
in_request = false
end
end
Loading

0 comments on commit 8dbd72d

Please sign in to comment.