diff --git a/README.md b/README.md index 1ac7efbdd7..b521f73d52 100755 --- a/README.md +++ b/README.md @@ -242,6 +242,7 @@ Supported operating systems and hardware: * 2013-10-17, Created.
## History +* v2.0, 2014-11-08, fix [#194](https://github.com/winlinvip/simple-rtmp-server/issues/194), writev multiple msgs, support 6k+ 250kbps clients. 2.0.15. * v2.0, 2014-11-08, fix [#194](https://github.com/winlinvip/simple-rtmp-server/issues/194), optmized st for timeout recv. pulse to 500ms. 2.0.14. * v2.0, 2014-11-08, fix [#195](https://github.com/winlinvip/simple-rtmp-server/issues/195), remove the confuse code st_usleep(0). 2.0.13. * v2.0, 2014-11-08, fix [#191](https://github.com/winlinvip/simple-rtmp-server/issues/191), configure --export-librtmp-project and --export-librtmp-single. 2.0.11. @@ -449,6 +450,7 @@ Performance benchmark history, on virtual box: * 2014-11-11, SRS 1.0.5, 2700clients, 85%CPU, 66MB. (1.0 equals 2.0.12) * 2014-11-12, SRS 2.0.14, 2700clients, 69%CPU, 59MB. * 2014-11-12, SRS 2.0.14, 3500clients, 95%CPU, 78MB. +* 2014-11-13, SRS 2.0.15, 6000clients, 82%CPU, 203MB. Latest benchmark(2014-07-12): diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index e9bcf5f94d..acd302059e 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -525,10 +525,7 @@ int SrsRtmpConn::playing(SrsSource* source) int64_t starttime = -1; while (true) { - // collect elapse for pithy print. - pithy_print.elapse(); - - // to use isolate thread to recv, can improve about 5% performance. + // TODO: to use isolate thread to recv, can improve about 5% performance. // @see: https://github.com/winlinvip/simple-rtmp-server/issues/196 // read from client. if (true) { @@ -539,6 +536,7 @@ int SrsRtmpConn::playing(SrsSource* source) if (ret == ERROR_SOCKET_TIMEOUT) { // it's ok, do nothing. ret = ERROR_SUCCESS; + srs_verbose("recv timeout, ignore. ret=%d", ret); } else if (ret != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("recv client control message failed. ret=%d", ret); @@ -554,6 +552,9 @@ int SrsRtmpConn::playing(SrsSource* source) } } + // collect elapse for pithy print. + pithy_print.elapse(); + // get messages from consumer. int count = 0; if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) { @@ -568,22 +569,16 @@ int SrsRtmpConn::playing(SrsSource* source) " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", pithy_print.age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m() + ); } - // sendout messages - // @remark, becareful, all msgs must be free explicitly, - // free by send_and_free_message or srs_freep. - for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs.msgs[i]; - - // the send_message will free the msg, - // so set the msgs[i] to NULL. - msgs.msgs[i] = NULL; - - // only when user specifies the duration, - // we start to collect the durations for each message. - if (user_specified_duration_to_stop) { + // only when user specifies the duration, + // we start to collect the durations for each message. + if (user_specified_duration_to_stop) { + for (int i = 0; i < count; i++) { + SrsSharedPtrMessage* msg = msgs.msgs[i]; + // foreach msg, collect the duration. // @remark: never use msg when sent it, for the protocol sdk will free it. if (starttime < 0 || starttime > msg->header.timestamp) { @@ -592,12 +587,23 @@ int SrsRtmpConn::playing(SrsSource* source) duration += msg->header.timestamp - starttime; starttime = msg->header.timestamp; } - + } + + // sendout messages + // @remark, becareful, all msgs must be free explicitly, + // free by send_and_free_message or srs_freep. + if (count > 0) { // no need to assert msg, for the rtmp will assert it. - if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) { - srs_error("send message to client failed. ret=%d", ret); - return ret; - } + ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id); + } + for (int i = 0; i < count; i++) { + // the send_message will free the msg, + // so set the msgs[i] to NULL. + msgs.msgs[i] = NULL; + } + if (ret != ERROR_SUCCESS) { + srs_error("send messages to client failed. ret=%d", ret); + return ret; } // if duration specified, and exceed it, stop play live. diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 9d6dbc9e12..78625cbf74 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 14 +#define VERSION_REVISION 15 // server info. #define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_ROLE "origin/edge server" diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index defc2d556c..4bb5513116 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -97,6 +97,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // always use fmt0 as cache. //#define SRS_CONSTS_RTMP_MAX_FMT3_HEADER_SIZE 5 +/** +* for performance issue, +* the iovs cache, @see https://github.com/winlinvip/simple-rtmp-server/issues/194 +* iovs cache for multiple messages for each connections. +*/ +#define SRS_CONSTS_IOVS_MAX 1024 +/** +* for performance issue, +* the c0c3 cache, @see https://github.com/winlinvip/simple-rtmp-server/issues/194 +* c0c3 cache for multiple messages for each connections. +*/ +#define SRS_CONSTS_C0C3_HEADERS_MAX 4096 + /////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////// diff --git a/trunk/src/rtmp/srs_protocol_rtmp.cpp b/trunk/src/rtmp/srs_protocol_rtmp.cpp index 304b59a788..76a0b2cb18 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.cpp @@ -771,6 +771,11 @@ int SrsRtmpServer::send_and_free_message(SrsMessage* msg, int stream_id) return protocol->send_and_free_message(msg, stream_id); } +int SrsRtmpServer::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id) +{ + return protocol->send_and_free_messages(msgs, nb_msgs, stream_id); +} + int SrsRtmpServer::send_and_free_packet(SrsPacket* packet, int stream_id) { return protocol->send_and_free_packet(packet, stream_id); diff --git a/trunk/src/rtmp/srs_protocol_rtmp.hpp b/trunk/src/rtmp/srs_protocol_rtmp.hpp index fcf224f306..e6dcb0b513 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.hpp @@ -368,6 +368,15 @@ class SrsRtmpServer */ virtual int send_and_free_message(SrsMessage* msg, int stream_id); /** + * send the RTMP message and always free it. + * user must never free or use the msg after this method, + * for it will always free the msg. + * @param msgs, the msgs to send out, never be NULL. + * @param nb_msgs, the size of msgs to send out. + * @param stream_id, the stream id of packet to send over, 0 for control message. + */ + virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id); + /** * send the RTMP packet and always free it. * user must never free or use the packet after this method, * for it will always free the packet. diff --git a/trunk/src/rtmp/srs_protocol_stack.cpp b/trunk/src/rtmp/srs_protocol_stack.cpp index 47255cd86e..8f2bb11a2e 100644 --- a/trunk/src/rtmp/srs_protocol_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_stack.cpp @@ -29,6 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include using namespace std; // when got a messae header, there must be some data, @@ -404,7 +405,15 @@ SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io) in_buffer = new SrsBuffer(); skt = io; - in_chunk_size = out_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE; + in_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE; + out_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE; + + nb_out_iovs = SRS_CONSTS_IOVS_MAX; + out_iovs = (iovec*)malloc(sizeof(iovec) * nb_out_iovs); + // each chunk consumers atleast 2 iovs + srs_assert(nb_out_iovs >= 2); + + warned_c0c3_caches = false; } SrsProtocol::~SrsProtocol() @@ -421,6 +430,12 @@ SrsProtocol::~SrsProtocol() } srs_freep(in_buffer); + + // alloc by malloc, use free directly. + if (out_iovs) { + free(out_iovs); + out_iovs = NULL; + } } void SrsProtocol::set_recv_timeout(int64_t timeout_us) @@ -560,7 +575,7 @@ int SrsProtocol::do_send_message(SrsMessage* msg) // always has header int nbh = 0; char* header = NULL; - generate_chunk_header(&msg->header, p == msg->payload, &nbh, &header); + generate_chunk_header(out_c0c3_cache, &msg->header, p == msg->payload, &nbh, &header); srs_assert(nbh > 0); // header iov @@ -590,10 +605,130 @@ int SrsProtocol::do_send_message(SrsMessage* msg) return ret; } -void SrsProtocol::generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph) +int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) { - char* cache = out_c0c3_cache; + int ret = ERROR_SUCCESS; + + // TODO: FIXME: use cache system instead. + int iov_index = 0; + iovec* iov = out_iovs + iov_index; + + int c0c3_cache_index = 0; + char* c0c3_cache = out_c0c3_caches + c0c3_cache_index; + + // try to send use the c0c3 header cache, + // if cache is consumed, try another loop. + for (int i = 0; i < nb_msgs; i++) { + SrsMessage* msg = msgs[i]; + + // ignore empty message. + if (!msg->payload || msg->size <= 0) { + srs_info("ignore empty message."); + continue; + } + + // we donot use the complex basic header, + // ensure the basic header is 1bytes. + if (msg->header.perfer_cid < 2) { + srs_warn("change the chunk_id=%d to default=%d", + msg->header.perfer_cid, RTMP_CID_ProtocolControl); + msg->header.perfer_cid = RTMP_CID_ProtocolControl; + } + + // p set to current write position, + // it's ok when payload is NULL and size is 0. + char* p = msg->payload; + char* pend = msg->payload + msg->size; + + // always write the header event payload is empty. + while (p < pend) { + // always has header + int nbh = 0; + char* header = NULL; + generate_chunk_header(c0c3_cache, &msg->header, p == msg->payload, &nbh, &header); + srs_assert(nbh > 0); + + // header iov + iov[0].iov_base = header; + iov[0].iov_len = nbh; + + // payload iov + int payload_size = pend - p; + if (payload_size > out_chunk_size) { + payload_size = out_chunk_size; + } + iov[1].iov_base = p; + iov[1].iov_len = payload_size; + + // consume sendout bytes. + p += payload_size; + + // realloc the iovs if exceed, + // for we donot know how many messges maybe to send entirely, + // we just alloc the iovs, it's ok. + if (iov_index >= nb_out_iovs - 2) { + nb_out_iovs += SRS_CONSTS_IOVS_MAX; + int realloc_size = sizeof(iovec) * nb_out_iovs; + out_iovs = (iovec*)realloc(out_iovs, realloc_size); + } + + // to next pair of iovs + iov_index += 2; + iov = out_iovs + iov_index; + + // to next c0c3 header cache + c0c3_cache_index += nbh; + c0c3_cache = out_c0c3_caches + c0c3_cache_index; + + // the cache header should never be realloc again, + // for the ptr is set to iovs, so we just warn user to set larger + // and use another loop to send again. + int c0c3_left = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index; + if (c0c3_left < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) { + // only warn once for a connection. + if (!warned_c0c3_caches) { + srs_warn("c0c3 cache header too small, recoment to %d", + SRS_CONSTS_C0C3_HEADERS_MAX + SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE); + warned_c0c3_caches = true; + } + + // when c0c3 cache dry, + // sendout all messages and reset the cache, then send again. + if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) { + srs_error("send with writev failed. ret=%d", ret); + return ret; + } + // reset caches, while these cache ensure + // atleast we can sendout a chunk. + iov_index = 0; + iov = out_iovs + iov_index; + + c0c3_cache_index = 0; + c0c3_cache = out_c0c3_caches + c0c3_cache_index; + } + } + } + + // maybe the iovs already sendout when c0c3 cache dry, + // so just ignore when no iovs to send. + if (iov_index <= 0) { + return ret; + } + + // send by writev + // sendout header and payload by writev. + // decrease the sys invoke count to get higher performance. + if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) { + srs_error("send with writev failed. ret=%d", ret); + return ret; + } + + return ret; +} + +void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph) +{ // to directly set the field. char* pp = NULL; @@ -856,6 +991,34 @@ int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id) return ret; } +int SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id) +{ + // always not NULL msg. + srs_assert(msgs); + srs_assert(nb_msgs > 0); + + // update the stream id in header. + for (int i = 0; i < nb_msgs; i++) { + SrsMessage* msg = msgs[i]; + // we assume that the stream_id in a group must be the same. + if (msg->header.stream_id == stream_id) { + break; + } + msg->header.stream_id = stream_id; + } + + // donot use the auto free to free the msg, + // for performance issue. + int ret = do_send_messages(msgs, nb_msgs); + + for (int i = 0; i < nb_msgs; i++) { + SrsMessage* msg = msgs[i]; + srs_freep(msg); + } + + return ret; +} + int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/rtmp/srs_protocol_stack.hpp b/trunk/src/rtmp/srs_protocol_stack.hpp index 071f5168f7..2f4914d9cc 100644 --- a/trunk/src/rtmp/srs_protocol_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_stack.hpp @@ -47,6 +47,7 @@ class SrsAmf0Any; class SrsMessageHeader; class SrsMessage; class SrsChunkStream; +class SrsSharedPtrMessage; /** * 4.1. Message Header @@ -221,6 +222,15 @@ class SrsProtocol */ iovec out_iov[2]; /** + * cache for multiple messages send + */ + iovec* out_iovs; + int nb_out_iovs; + // the c0c3 cache cannot be realloc. + char out_c0c3_caches[SRS_CONSTS_C0C3_HEADERS_MAX]; + // whether warned user to increase the c0c3 header cache. + bool warned_c0c3_caches; + /** * output chunk size, default to 128, set by config. */ int32_t out_chunk_size; @@ -276,6 +286,15 @@ class SrsProtocol */ virtual int send_and_free_message(SrsMessage* msg, int stream_id); /** + * send the RTMP message and always free it. + * user must never free or use the msg after this method, + * for it will always free the msg. + * @param msgs, the msgs to send out, never be NULL. + * @param nb_msgs, the size of msgs to send out. + * @param stream_id, the stream id of packet to send over, 0 for control message. + */ + virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id); + /** * send the RTMP packet and always free it. * user must never free or use the packet after this method, * for it will always free the packet. @@ -349,6 +368,11 @@ class SrsProtocol */ virtual int do_send_message(SrsMessage* msg); /** + * send out the messages, donot free it, + * the caller must free the param msgs. + */ + virtual int do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs); + /** * generate the chunk header for msg. * @param mh, the header of msg to send. * @param c0, whether the first chunk, the c0 chunk. @@ -356,7 +380,7 @@ class SrsProtocol * @param ph, output the header cache. * user should never free it, it's cached header. */ - virtual void generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph); + virtual void generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph); /** * imp for decode_message */