From f35ec2155b1408d528a9f37da7904c9625186bcf Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 3 Dec 2014 19:27:27 +0800 Subject: [PATCH] for bug #241, support merged read. 2.0.48 --- trunk/src/app/srs_app_recv_thread.cpp | 57 ++++++++++++++++++++------ trunk/src/app/srs_app_recv_thread.hpp | 10 +++-- trunk/src/core/srs_core.hpp | 2 +- trunk/src/rtmp/srs_protocol_buffer.cpp | 33 +++++++++++---- trunk/src/rtmp/srs_protocol_buffer.hpp | 42 +++++++++++++++++++ trunk/src/rtmp/srs_protocol_rtmp.cpp | 5 +++ trunk/src/rtmp/srs_protocol_rtmp.hpp | 10 +++++ trunk/src/rtmp/srs_protocol_stack.cpp | 5 +++ trunk/src/rtmp/srs_protocol_stack.hpp | 11 +++++ 9 files changed, 152 insertions(+), 23 deletions(-) diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 8ab29ebc35..e4bfaa8971 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -26,6 +26,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include + +// when we read from socket less than this value, +// sleep a while to merge read. +// @see https://github.com/winlinvip/simple-rtmp-server/issues/241 +#define SRS_MERGED_READ_SIZE (SOCKET_READ_SIZE / 10) +// the time to sleep to merge read, to read more bytes. +#define SRS_MERGED_READ_US (300 * 1000) ISrsMessageHandler::ISrsMessageHandler() { @@ -271,6 +279,30 @@ void SrsPublishRecvThread::stop() trd.stop(); } +void SrsPublishRecvThread::on_thread_start() +{ + // we donot set the auto response to false, + // for the main thread never send message. + + // enable the merge read + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 + rtmp->set_merge_read(true, this); +} + +void SrsPublishRecvThread::on_thread_stop() +{ + // we donot set the auto response to true, + // for we donot set to false yet. + + // when thread stop, signal the conn thread which wait. + // @see https://github.com/winlinvip/simple-rtmp-server/issues/244 + st_cond_signal(error); + + // disable the merge read + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 + rtmp->set_merge_read(false, NULL); +} + bool SrsPublishRecvThread::can_handle() { // publish thread always can handle message. @@ -302,18 +334,19 @@ void SrsPublishRecvThread::on_recv_error(int ret) st_cond_signal(error); } -void SrsPublishRecvThread::on_thread_start() -{ - // we donot set the auto response to false, - // for the main thread never send message. -} - -void SrsPublishRecvThread::on_thread_stop() +void SrsPublishRecvThread::on_read(ssize_t nread) { - // we donot set the auto response to true, - // for we donot set to false yet. + if (nread < 0) { + return; + } - // when thread stop, signal the conn thread which wait. - // @see https://github.com/winlinvip/simple-rtmp-server/issues/244 - st_cond_signal(error); + /** + * to improve read performance, merge some packets then read, + * when it on and read small bytes, we sleep to wait more data., + * that is, we merge some data to read together. + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 + */ + if (nread < SRS_MERGED_READ_SIZE) { + st_usleep(SRS_MERGED_READ_US); + } } diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 5a579b6dcf..f723374fd5 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include class SrsRtmpServer; class SrsMessage; @@ -132,7 +133,7 @@ class SrsQueueRecvThread : public ISrsMessageHandler * the publish recv thread got message and callback the source method to process message. * @see: https://github.com/winlinvip/simple-rtmp-server/issues/237 */ -class SrsPublishRecvThread : public ISrsMessageHandler +class SrsPublishRecvThread : virtual public ISrsMessageHandler, virtual public IMergeReadHandler { private: SrsRecvThread trd; @@ -163,13 +164,16 @@ class SrsPublishRecvThread : public ISrsMessageHandler public: virtual int start(); virtual void stop(); + virtual void on_thread_start(); + virtual void on_thread_stop(); +// interface ISrsMessageHandler public: virtual bool can_handle(); virtual int handle(SrsMessage* msg); virtual void on_recv_error(int ret); +// interface IMergeReadHandler public: - virtual void on_thread_start(); - virtual void on_thread_stop(); + virtual void on_read(ssize_t nread); }; #endif diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 1bea57bce1..df7b42d152 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 47 +#define VERSION_REVISION 48 // server info. #define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_ROLE "origin/edge server" diff --git a/trunk/src/rtmp/srs_protocol_buffer.cpp b/trunk/src/rtmp/srs_protocol_buffer.cpp index b3e15bf3bc..1778b4efd0 100644 --- a/trunk/src/rtmp/srs_protocol_buffer.cpp +++ b/trunk/src/rtmp/srs_protocol_buffer.cpp @@ -26,16 +26,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -// 4KB=4096 -// 8KB=8192 -// 16KB=16384 -// 32KB=32768 -// 64KB=65536 -// @see https://github.com/winlinvip/simple-rtmp-server/issues/241 -#define SOCKET_READ_SIZE 4096 +IMergeReadHandler::IMergeReadHandler() +{ +} + +IMergeReadHandler::~IMergeReadHandler() +{ +} SrsBuffer::SrsBuffer() { + merged_read = false; + _handler = NULL; + buffer = new char[SOCKET_READ_SIZE]; } @@ -93,6 +96,16 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size) return ret; } + /** + * to improve read performance, merge some packets then read, + * when it on and read small bytes, we sleep to wait more data., + * that is, we merge some data to read together. + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 + */ + if (merged_read && _handler) { + _handler->on_read(nread); + } + srs_assert((int)nread > 0); append(buffer, (int)nread); } @@ -100,4 +113,10 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size) return ret; } +void SrsBuffer::set_merge_read(bool v, IMergeReadHandler* handler) +{ + merged_read = v; + _handler = handler; +} + diff --git a/trunk/src/rtmp/srs_protocol_buffer.hpp b/trunk/src/rtmp/srs_protocol_buffer.hpp index 13622d96e3..b03f8ec4b1 100644 --- a/trunk/src/rtmp/srs_protocol_buffer.hpp +++ b/trunk/src/rtmp/srs_protocol_buffer.hpp @@ -34,6 +34,34 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +// 4KB=4096 +// 8KB=8192 +// 16KB=16384 +// 32KB=32768 +// 64KB=65536 +// @see https://github.com/winlinvip/simple-rtmp-server/issues/241 +#define SOCKET_READ_SIZE 4096 + +/** +* to improve read performance, merge some packets then read, +* when it on and read small bytes, we sleep to wait more data., +* that is, we merge some data to read together. +* @see https://github.com/winlinvip/simple-rtmp-server/issues/241 +*/ +class IMergeReadHandler +{ +public: + IMergeReadHandler(); + virtual ~IMergeReadHandler(); +public: + /** + * when read from channel, notice the merge handler to sleep for + * some small bytes. + * @remark, it only for server-side, client srs-librtmp just ignore. + */ + virtual void on_read(ssize_t nread) = 0; +}; + /** * the buffer provices bytes cache for protocol. generally, * protocol recv data from socket, put into buffer, decode to RTMP message. @@ -41,6 +69,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsBuffer { private: + // the merged handler + bool merged_read; + IMergeReadHandler* _handler; + // data and socket buffer std::vector data; char* buffer; public: @@ -79,6 +111,16 @@ class SrsBuffer * @remark, we actually maybe read more than required_size, maybe 4k for example. */ virtual int grow(ISrsBufferReader* reader, int required_size); +public: + /** + * to improve read performance, merge some packets then read, + * when it on and read small bytes, we sleep to wait more data., + * that is, we merge some data to read together. + * @param v true to ename merged read. + * @param handler the handler when merge read is enabled. + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 + */ + virtual void set_merge_read(bool v, IMergeReadHandler* handler); }; #endif diff --git a/trunk/src/rtmp/srs_protocol_rtmp.cpp b/trunk/src/rtmp/srs_protocol_rtmp.cpp index 39ccb21319..80183dbbdc 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.cpp @@ -745,6 +745,11 @@ void SrsRtmpServer::set_auto_response(bool v) protocol->set_auto_response(v); } +void SrsRtmpServer::set_merge_read(bool v, IMergeReadHandler* handler) +{ + protocol->set_merge_read(v, handler); +} + void SrsRtmpServer::set_recv_timeout(int64_t timeout_us) { protocol->set_recv_timeout(timeout_us); diff --git a/trunk/src/rtmp/srs_protocol_rtmp.hpp b/trunk/src/rtmp/srs_protocol_rtmp.hpp index 9846d76191..46eb92275d 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.hpp @@ -46,6 +46,7 @@ class SrsPlayPacket; class SrsMessage; class SrsPacket; class SrsAmf0Object; +class IMergeReadHandler; /** * the original request from client. @@ -343,6 +344,15 @@ class SrsRtmpServer */ virtual void set_auto_response(bool v); /** + * to improve read performance, merge some packets then read, + * when it on and read small bytes, we sleep to wait more data., + * that is, we merge some data to read together. + * @param v true to ename merged read. + * @param handler the handler when merge read is enabled. + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 + */ + virtual void set_merge_read(bool v, IMergeReadHandler* handler); + /** * set/get the recv timeout in us. * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT. */ diff --git a/trunk/src/rtmp/srs_protocol_stack.cpp b/trunk/src/rtmp/srs_protocol_stack.cpp index 94d75ced78..eb030f0073 100644 --- a/trunk/src/rtmp/srs_protocol_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_stack.cpp @@ -479,6 +479,11 @@ int SrsProtocol::manual_response_flush() return ret; } +void SrsProtocol::set_merge_read(bool v, IMergeReadHandler* handler) +{ + in_buffer->set_merge_read(v, handler); +} + void SrsProtocol::set_recv_timeout(int64_t timeout_us) { return skt->set_recv_timeout(timeout_us); diff --git a/trunk/src/rtmp/srs_protocol_stack.hpp b/trunk/src/rtmp/srs_protocol_stack.hpp index d9ba0ee847..0f7dd69d0c 100644 --- a/trunk/src/rtmp/srs_protocol_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_stack.hpp @@ -53,6 +53,7 @@ class SrsMessageHeader; class SrsMessage; class SrsChunkStream; class SrsSharedPtrMessage; +class IMergeReadHandler; /** * 4.1. Message Header @@ -269,6 +270,16 @@ class SrsProtocol * @see the auto_response_when_recv and manual_response_queue. */ virtual int manual_response_flush(); +public: + /** + * to improve read performance, merge some packets then read, + * when it on and read small bytes, we sleep to wait more data., + * that is, we merge some data to read together. + * @param v true to ename merged read. + * @param handler the handler when merge read is enabled. + * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 + */ + virtual void set_merge_read(bool v, IMergeReadHandler* handler); public: /** * set/get the recv timeout in us.