Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

PION-1298: merging zettar patches for incremental payload processing

Basically I replaced the "payload handler" and "headers parsed callback" with
a new type of "incremental handler." You can set an incremental handler for
either request_reader or response_reader objects.  When it's defined, every
time it finishes processing a block of data from async_read_some() it will
call the incremental handler.  The incremental handler is passed a function
object "continue handler" as one of its parameters, which basically just
triggers the next async_read_some().  It also is passed a flag that indicates
whether header parsing has finished as well as the current http::message
object being built. Whenever a block of payload content is read/parsed, and
"incremental parsing" is enabled (by defining an incremental handler), it sets
the http::message's content buffer to most recent block of data read.
  • Loading branch information...
commit a83cee1f2361f6ce848fe5fae75d203869504e4c 1 parent 878df1b
@mikedickey mikedickey authored
View
1  include/pion/http/message.hpp
@@ -337,7 +337,6 @@ class PION_API message
inline void clear_content(void) {
set_content_length(0);
create_content_buffer();
- delete_value(m_headers, HEADER_CONTENT_TYPE);
}
/// sets the content type for the message payload
View
40 include/pion/http/parser.hpp
@@ -12,7 +12,6 @@
#include <string>
#include <boost/noncopyable.hpp>
-#include <boost/function/function2.hpp>
#include <boost/logic/tribool.hpp>
#include <boost/system/error_code.hpp>
#include <boost/thread/once.hpp>
@@ -41,9 +40,6 @@ class PION_API parser :
/// maximum length for HTTP payload content
static const std::size_t DEFAULT_CONTENT_MAX;
- /// callback type used to consume payload content
- typedef boost::function2<void, const char *, std::size_t> payload_handler_t;
-
/// class-specific error code values
enum error_value_t {
ERROR_METHOD_CHAR = 1,
@@ -130,7 +126,8 @@ class PION_API parser :
m_bytes_content_remaining(0), m_bytes_content_read(0),
m_bytes_last_read(0), m_bytes_total_read(0),
m_max_content_length(max_content_length),
- m_parse_headers_only(false), m_save_raw_headers(false)
+ m_parse_headers_only(false), m_save_raw_headers(false),
+ m_incremental_parsing(false), m_finished_parsing_headers(false)
{}
/// default destructor
@@ -239,6 +236,7 @@ class PION_API parser :
m_query_string.erase();
m_raw_headers.erase();
m_bytes_content_read = m_bytes_last_read = m_bytes_total_read = 0;
+ m_finished_parsing_headers = false;
}
/// returns true if there are no more bytes available in the read buffer
@@ -265,15 +263,18 @@ class PION_API parser :
/// returns true if the parser is saving raw HTTP header contents
inline bool get_save_raw_headers(void) const { return m_save_raw_headers; }
+ /// returns true if incremental content parsing is enabled
+ inline bool get_incremental_parsing(void) const { return m_incremental_parsing; }
+
/// returns true if the parser is being used to parse an HTTP request
inline bool is_parsing_request(void) const { return m_is_request; }
/// returns true if the parser is being used to parse an HTTP response
inline bool is_parsing_response(void) const { return ! m_is_request; }
- /// defines a callback function to be used for consuming payload content
- inline void set_payload_handler(payload_handler_t& h) { m_payload_handler = h; }
-
+ /// returns true if we have finished parsing the HTTP headers
+ inline bool get_finished_parsing_headers(void) const { return m_finished_parsing_headers; }
+
/// sets the maximum length for HTTP payload content
inline void set_max_content_length(std::size_t n) { m_max_content_length = n; }
@@ -283,6 +284,9 @@ class PION_API parser :
/// sets parameter for saving raw HTTP header content
inline void set_save_raw_headers(bool b) { m_save_raw_headers = b; }
+ /// sets parameter for incrementally parsing HTTP payload content
+ inline void set_incremental_parsing(bool b) { m_incremental_parsing = b; }
+
/// sets the logger to be used
inline void set_logger(logger log_ptr) { m_logger = log_ptr; }
@@ -400,9 +404,6 @@ class PION_API parser :
protected:
- /// Called after we have finished parsing the HTTP message headers
- virtual void finished_parsing_headers(const boost::system::error_code& ec) {}
-
/**
* parses an HTTP message up to the end of the headers using bytes
* available in the read buffer
@@ -427,7 +428,7 @@ class PION_API parser :
/**
* parses a chunked HTTP message-body using bytes available in the read buffer
*
- * @param chunk_buffers buffers to be populated from parsing chunked content
+ * @param http_msg the HTTP message object to consume content for
* @param ec error_code contains additional information for parsing errors
*
* @return boost::tribool result of parsing:
@@ -435,7 +436,7 @@ class PION_API parser :
* true = finished parsing message,
* indeterminate = message is not yet finished
*/
- boost::tribool parse_chunks(http::message::chunk_cache_t& chunk_buffers,
+ boost::tribool parse_chunks(http::message& http_msg,
boost::system::error_code& ec);
/**
@@ -456,10 +457,10 @@ class PION_API parser :
* consume the bytes available in the read buffer, converting them into
* the next chunk for the HTTP message
*
- * @param chunk_buffers buffers to be populated from parsing chunked content
+ * @param http_msg the HTTP message object to consume content for
* @return std::size_t number of content bytes consumed, if any
*/
- std::size_t consume_content_as_next_chunk(http::message::chunk_cache_t& chunk_buffers);
+ std::size_t consume_content_as_next_chunk(http::message& http_msg);
/**
* compute and sets a HTTP Message data integrity status
@@ -579,9 +580,6 @@ class PION_API parser :
/// the current state of parsing chunked content
chunk_parse_state_t m_chunked_content_parse_state;
- /// if defined, this function is used to consume payload content
- payload_handler_t m_payload_handler;
-
/// Used for parsing the HTTP response status code
boost::uint16_t m_status_code;
@@ -636,6 +634,12 @@ class PION_API parser :
/// if true, the raw contents of HTTP headers are stored into m_raw_headers
bool m_save_raw_headers;
+ /// if true, incremental content parsing behavior is enabled
+ bool m_incremental_parsing;
+
+ /// true if we have finished parsing the HTTP headers
+ bool m_finished_parsing_headers;
+
/// points to a single and unique instance of the parser error_category_t
static error_category_t * m_error_category_ptr;
View
14 include/pion/http/reader.hpp
@@ -69,6 +69,9 @@ class PION_API reader :
/// Consumes bytes that have been read using an HTTP parser
void consume_bytes(void);
+ /// starts tcp::timer object to timeout reads (if timeout is not zero)
+ void begin_timeout(void);
+
/// Reads more bytes from the TCP connection
virtual void read_bytes(void) = 0;
@@ -81,9 +84,6 @@ class PION_API reader :
private:
- /// reads more bytes for parsing, with timeout support
- void read_bytes_with_timeout(void);
-
/**
* Handles errors that occur during read operations
*
@@ -93,17 +93,17 @@ class PION_API reader :
/// default maximum number of seconds for read operations
- static const boost::uint32_t DEFAULT_READ_TIMEOUT;
+ static const boost::uint32_t DEFAULT_READ_TIMEOUT;
/// The HTTP connection that has a new HTTP message to parse
- tcp::connection_ptr m_tcp_conn;
+ tcp::connection_ptr m_tcp_conn;
/// pointer to a tcp::timer object if read timeouts are enabled
- tcp::timer_ptr m_timer_ptr;
+ tcp::timer_ptr m_timer_ptr;
/// maximum number of seconds for read operations
- boost::uint32_t m_read_timeout;
+ boost::uint32_t m_read_timeout;
};
View
69 include/pion/http/request_reader.hpp
@@ -13,7 +13,8 @@
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
-#include <boost/function/function2.hpp>
+#include <boost/function/function3.hpp>
+#include <boost/function/function4.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <pion/config.hpp>
@@ -35,10 +36,31 @@ class request_reader :
public:
- /// function called after the HTTP message has been parsed
+ /// function type called by incremental handler when it's finished
+ typedef boost::function0<void> continue_handler_t;
+
+ /**
+ * function type called during incremental processing when not finished.
+ * parameters are:
+ *
+ * shared pointer to HTTP object being parsed,
+ * shared pointer to tcp connection being used,
+ * boolean true if finished parsing headers
+ * function object to be called when ready to process more
+ */
+ typedef boost::function4<void, http::request_ptr, tcp::connection_ptr,
+ bool, continue_handler_t> incremental_handler_t;
+
+ /**
+ * function type called after the HTTP message has been parsed.
+ * parameters are:
+ *
+ * shared pointer to HTTP object being parsed,
+ * shared pointer to tcp connection being used,
+ * error code if a problem occured during parsing
+ */
typedef boost::function3<void, http::request_ptr, tcp::connection_ptr,
const boost::system::error_code&> finished_handler_t;
-
// default destructor
virtual ~request_reader() {}
@@ -56,8 +78,8 @@ class request_reader :
(new request_reader(tcp_conn, handler));
}
- /// sets a function to be called after HTTP headers have been parsed
- inline void set_headers_parsed_callback(finished_handler_t& h) { m_parsed_headers = h; }
+ /// defines a callback function to be used for incremental processing
+ inline void set_incremental_handler(incremental_handler_t& h) { m_incremental_handler = h; set_incremental_parsing(true); }
protected:
@@ -75,21 +97,28 @@ class request_reader :
m_http_msg->set_remote_ip(tcp_conn->get_remote_ip());
set_logger(PION_GET_LOGGER("pion.http.request_reader"));
}
-
+
+ /// read and process more bytes from tcp connection
+ void async_read_some(void) {
+ begin_timeout();
+ get_connection()->async_read_some(boost::bind(&request_reader::consume_bytes,
+ shared_from_this(),
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+ }
+
/// Reads more bytes from the TCP connection
virtual void read_bytes(void) {
- get_connection()->async_read_some(boost::bind(&request_reader::consume_bytes,
- shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ if (m_incremental_handler) {
+ // call the finished handler allowing it to incrementally process
+ // and return control when finished by calling http::reader::receive()
+ m_incremental_handler(m_http_msg, get_connection(), get_finished_parsing_headers(),
+ boost::bind(&request_reader::async_read_some, shared_from_this()));
+ } else {
+ async_read_some();
+ }
}
- /// Called after we have finished parsing the HTTP message headers
- virtual void finished_parsing_headers(const boost::system::error_code& ec) {
- // call the finished headers handler with the HTTP message
- if (m_parsed_headers) m_parsed_headers(m_http_msg, get_connection(), ec);
- }
-
/// Called after we have finished reading/parsing the HTTP message
virtual void finished_reading(const boost::system::error_code& ec) {
// call the finished handler with the finished HTTP message
@@ -100,13 +129,13 @@ class request_reader :
virtual http::message& get_message(void) { return *m_http_msg; }
/// The new HTTP message container being created
- http::request_ptr m_http_msg;
+ http::request_ptr m_http_msg;
/// function called after the HTTP message has been parsed
- finished_handler_t m_finished;
+ finished_handler_t m_finished;
- /// function called after the HTTP message headers have been parsed
- finished_handler_t m_parsed_headers;
+ /// true if we should process the HTTP message incrementally
+ incremental_handler_t m_incremental_handler;
};
View
67 include/pion/http/response_reader.hpp
@@ -13,7 +13,8 @@
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
-#include <boost/function/function2.hpp>
+#include <boost/function/function3.hpp>
+#include <boost/function/function4.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <pion/config.hpp>
@@ -35,10 +36,31 @@ class response_reader :
public:
- /// function called after the HTTP message has been parsed
+ /// function type called by incremental handler when it's finished
+ typedef boost::function0<void> continue_handler_t;
+
+ /**
+ * function type called during incremental processing when not finished.
+ * parameters are:
+ *
+ * shared pointer to HTTP object being parsed,
+ * shared pointer to tcp connection being used,
+ * boolean true if finished parsing headers
+ * function object to be called when ready to process more
+ */
+ typedef boost::function4<void, http::response_ptr, tcp::connection_ptr,
+ bool, continue_handler_t> incremental_handler_t;
+
+ /**
+ * function type called after the HTTP message has been parsed.
+ * parameters are:
+ *
+ * shared pointer to HTTP object being parsed,
+ * shared pointer to tcp connection being used,
+ * error code if a problem occured during parsing
+ */
typedef boost::function3<void, http::response_ptr, tcp::connection_ptr,
const boost::system::error_code&> finished_handler_t;
-
// default destructor
virtual ~response_reader() {}
@@ -58,8 +80,8 @@ class response_reader :
(new response_reader(tcp_conn, http_request, handler));
}
- /// sets a function to be called after HTTP headers have been parsed
- inline void set_headers_parsed_callback(finished_handler_t& h) { m_parsed_headers = h; }
+ /// defines a callback function to be used for incremental processing
+ inline void set_incremental_handler(incremental_handler_t& h) { m_incremental_handler = h; set_incremental_parsing(true); }
protected:
@@ -80,20 +102,27 @@ class response_reader :
set_logger(PION_GET_LOGGER("pion.http.response_reader"));
}
- /// Reads more bytes from the TCP connection
- virtual void read_bytes(void) {
+ /// read and process more bytes from tcp connection
+ void async_read_some(void) {
+ begin_timeout();
get_connection()->async_read_some(boost::bind(&response_reader::consume_bytes,
- shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ shared_from_this(),
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
- /// Called after we have finished parsing the HTTP message headers
- virtual void finished_parsing_headers(const boost::system::error_code& ec) {
- // call the finished headers handler with the HTTP message
- if (m_parsed_headers) m_parsed_headers(m_http_msg, get_connection(), ec);
+ /// Reads more bytes from the TCP connection
+ virtual void read_bytes(void) {
+ if (m_incremental_handler) {
+ // call the finished handler allowing it to incrementally process
+ // and return control when finished by calling http::reader::receive()
+ m_incremental_handler(m_http_msg, get_connection(), get_finished_parsing_headers(),
+ boost::bind(&response_reader::async_read_some, shared_from_this()));
+ } else {
+ async_read_some();
+ }
}
-
+
/// Called after we have finished reading/parsing the HTTP message
virtual void finished_reading(const boost::system::error_code& ec) {
// call the finished handler with the finished HTTP message
@@ -105,13 +134,13 @@ class response_reader :
/// The new HTTP message container being created
- http::response_ptr m_http_msg;
+ http::response_ptr m_http_msg;
/// function called after the HTTP message has been parsed
- finished_handler_t m_finished;
+ finished_handler_t m_finished;
- /// function called after the HTTP message headers have been parsed
- finished_handler_t m_parsed_headers;
+ /// true if we should process the HTTP message incrementally
+ incremental_handler_t m_incremental_handler;
};
View
63 src/http_parser.cpp
@@ -47,8 +47,12 @@ boost::tribool parser::parse(http::message& http_msg,
boost::tribool rc = boost::indeterminate;
std::size_t total_bytes_parsed = 0;
+
+ if (m_incremental_parsing) {
+ http_msg.clear_content();
+ }
- if(http_msg.has_missing_packets()) {
+ if (http_msg.has_missing_packets()) {
http_msg.set_data_after_missing_packet(true);
}
@@ -72,10 +76,10 @@ boost::tribool parser::parse(http::message& http_msg,
// parsing chunked payload content
case PARSE_CHUNKS:
- rc = parse_chunks(http_msg.get_chunk_cache(), ec);
+ rc = parse_chunks(http_msg, ec);
total_bytes_parsed += m_bytes_last_read;
// check if we have finished parsing all chunks
- if (rc == true && !m_payload_handler) {
+ if (rc == true && !m_incremental_parsing) {
http_msg.concatenate_chunks();
}
break;
@@ -88,7 +92,7 @@ boost::tribool parser::parse(http::message& http_msg,
// parsing payload content with no length (until EOF)
case PARSE_CONTENT_NO_LENGTH:
- consume_content_as_next_chunk(http_msg.get_chunk_cache());
+ consume_content_as_next_chunk(http_msg);
total_bytes_parsed += m_bytes_last_read;
break;
@@ -138,9 +142,11 @@ boost::tribool parser::parse_missing_data(http::message& http_msg,
&& (m_size_of_current_chunk - m_bytes_read_in_current_chunk) >= len)
{
// use dummy content for missing data
- if (m_payload_handler) {
+ if (m_incremental_parsing) {
+ http_msg.set_content_length(len);
+ char *ptr = http_msg.create_content_buffer();
for (std::size_t n = 0; n < len; ++n)
- m_payload_handler(&MISSING_DATA_CHAR, 1);
+ ptr[n] = MISSING_DATA_CHAR;
} else {
for (std::size_t n = 0; n < len && http_msg.get_chunk_cache().size() < m_max_content_length; ++n)
http_msg.get_chunk_cache().push_back(MISSING_DATA_CHAR);
@@ -174,9 +180,12 @@ boost::tribool parser::parse_missing_data(http::message& http_msg,
} else {
// make sure content buffer is not already full
- if (m_payload_handler) {
+ if (m_incremental_parsing) {
+ http_msg.set_content_length(len);
+ char *ptr = http_msg.create_content_buffer();
for (std::size_t n = 0; n < len; ++n)
- m_payload_handler(&MISSING_DATA_CHAR, 1);
+ ptr[n] = MISSING_DATA_CHAR;
+ m_bytes_content_read += len;
} else if ( (m_bytes_content_read+len) <= m_max_content_length) {
// use dummy content for missing data
for (std::size_t n = 0; n < len; ++n)
@@ -197,9 +206,11 @@ boost::tribool parser::parse_missing_data(http::message& http_msg,
// parsing payload content with no length (until EOF)
case PARSE_CONTENT_NO_LENGTH:
// use dummy content for missing data
- if (m_payload_handler) {
+ if (m_incremental_parsing) {
+ http_msg.set_content_length(len);
+ char *ptr = http_msg.create_content_buffer();
for (std::size_t n = 0; n < len; ++n)
- m_payload_handler(&MISSING_DATA_CHAR, 1);
+ ptr[n] = MISSING_DATA_CHAR;
} else {
for (std::size_t n = 0; n < len && http_msg.get_chunk_cache().size() < m_max_content_length; ++n)
http_msg.get_chunk_cache().push_back(MISSING_DATA_CHAR);
@@ -801,7 +812,7 @@ boost::tribool parser::finish_header_parsing(http::message& http_msg,
}
}
- finished_parsing_headers(ec);
+ m_finished_parsing_headers = true;
return rc;
}
@@ -1056,8 +1067,8 @@ bool parser::parse_cookie_header(ihash_multimap& dict,
return true;
}
-boost::tribool parser::parse_chunks(http::message::chunk_cache_t& chunks,
- boost::system::error_code& ec)
+boost::tribool parser::parse_chunks(http::message& http_msg,
+ boost::system::error_code& ec)
{
//
// note that boost::tribool may have one of THREE states:
@@ -1066,6 +1077,7 @@ boost::tribool parser::parse_chunks(http::message::chunk_cache_t& chunks,
// true: finished successfully parsing the message
// indeterminate: parsed bytes, but the message is not yet finished
//
+ http::message::chunk_cache_t& chunks = http_msg.get_chunk_cache();
const char *read_start_ptr = m_read_ptr;
m_bytes_last_read = 0;
while (m_read_ptr < m_read_end_ptr) {
@@ -1134,11 +1146,12 @@ boost::tribool parser::parse_chunks(http::message::chunk_cache_t& chunks,
case PARSE_CHUNK:
if (m_bytes_read_in_current_chunk < m_size_of_current_chunk) {
- if (m_payload_handler) {
+ if (m_incremental_parsing) {
const std::size_t bytes_avail = bytes_available();
const std::size_t bytes_in_chunk = m_size_of_current_chunk - m_bytes_read_in_current_chunk;
const std::size_t len = (bytes_in_chunk > bytes_avail) ? bytes_avail : bytes_in_chunk;
- m_payload_handler(m_read_ptr, len);
+ http_msg.set_content_length(len);
+ memcpy(http_msg.create_content_buffer(), m_read_ptr, len);
m_bytes_read_in_current_chunk += len;
if (len > 1) m_read_ptr += (len - 1);
} else if (chunks.size() < m_max_content_length) {
@@ -1228,8 +1241,9 @@ boost::tribool parser::consume_content(http::message& http_msg,
}
// make sure content buffer is not already full
- if (m_payload_handler) {
- m_payload_handler(m_read_ptr, content_bytes_to_read);
+ if (m_incremental_parsing) {
+ http_msg.set_content_length(content_bytes_to_read);
+ memcpy(http_msg.create_content_buffer(), m_read_ptr, content_bytes_to_read);
} else if (m_bytes_content_read < m_max_content_length) {
if (m_bytes_content_read + content_bytes_to_read > m_max_content_length) {
// read would exceed maximum size for content buffer
@@ -1250,16 +1264,17 @@ boost::tribool parser::consume_content(http::message& http_msg,
return rc;
}
-std::size_t parser::consume_content_as_next_chunk(http::message::chunk_cache_t& chunks)
+std::size_t parser::consume_content_as_next_chunk(http::message& http_msg)
{
if (bytes_available() == 0) {
m_bytes_last_read = 0;
} else {
m_bytes_last_read = (m_read_end_ptr - m_read_ptr);
- if (m_payload_handler) {
- if (m_bytes_last_read)
- m_payload_handler(m_read_ptr, m_bytes_last_read);
+ if (m_incremental_parsing) {
+ http_msg.set_content_length(m_bytes_last_read);
+ memcpy(http_msg.create_content_buffer(), m_read_ptr, m_bytes_last_read);
} else {
+ http::message::chunk_cache_t& chunks = http_msg.get_chunk_cache();
while (m_read_ptr < m_read_end_ptr) {
if (chunks.size() < m_max_content_length)
chunks.push_back(*m_read_ptr);
@@ -1296,19 +1311,19 @@ void parser::finish(http::message& http_msg) const
break;
case PARSE_CHUNKS:
http_msg.set_is_valid(m_chunked_content_parse_state==PARSE_CHUNK_SIZE_START);
- if (!m_payload_handler)
+ if (!m_incremental_parsing)
http_msg.concatenate_chunks();
break;
case PARSE_CONTENT_NO_LENGTH:
http_msg.set_is_valid(true);
- if (!m_payload_handler)
+ if (!m_incremental_parsing)
http_msg.concatenate_chunks();
break;
}
compute_msg_status(http_msg, http_msg.is_valid());
- if (is_parsing_request() && !m_payload_handler) {
+ if (is_parsing_request() && !m_incremental_parsing) {
// Parse query pairs from post content if content type is x-www-form-urlencoded.
// Type could be followed by parameters (as defined in section 3.6 of RFC 2616)
// e.g. Content-Type: application/x-www-form-urlencoded; charset=UTF-8
View
7 src/http_reader.cpp
@@ -34,7 +34,7 @@ void reader::receive(void)
} else {
// no pipelined messages available in the read buffer -> read bytes from the socket
m_tcp_conn->set_lifecycle(tcp::connection::LIFECYCLE_CLOSE); // default to close the connection
- read_bytes_with_timeout();
+ read_bytes();
}
}
@@ -116,11 +116,11 @@ void reader::consume_bytes(void)
finished_reading(ec);
} else {
// not yet finished parsing the message -> read more data
- read_bytes_with_timeout();
+ read_bytes();
}
}
-void reader::read_bytes_with_timeout(void)
+void reader::begin_timeout(void)
{
if (m_read_timeout > 0) {
m_timer_ptr.reset(new tcp::timer(m_tcp_conn));
@@ -128,7 +128,6 @@ void reader::read_bytes_with_timeout(void)
} else if (m_timer_ptr) {
m_timer_ptr.reset();
}
- read_bytes();
}
void reader::handle_read_error(const boost::system::error_code& read_error)
Please sign in to comment.
Something went wrong with that request. Please try again.