Skip to content

Commit

Permalink
Threads-Log: Refine dual queue for log thread.
Browse files Browse the repository at this point in the history
1. App/User controls the interval to flush coroutine-queue.
2. Use srs_update_system_time to get time for log.
3. Stat the thread sync in us, in SrsThreadPool.
4. Change default interval for thread to 5s.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 37aee44 commit 6718c38
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 32 deletions.
4 changes: 2 additions & 2 deletions trunk/conf/full.conf
Expand Up @@ -119,8 +119,8 @@ tcmalloc_release_rate 0.8;
# For thread pool.
threads {
# The thread pool manager cycle interval, in seconds.
# Default: 60
interval 60;
# Default: 5
interval 5;
}

#############################################################################################
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_config.cpp
Expand Up @@ -4107,7 +4107,7 @@ double SrsConfig::tcmalloc_release_rate()

srs_utime_t SrsConfig::get_threads_interval()
{
static srs_utime_t DEFAULT = 60 * SRS_UTIME_SECONDS;
static srs_utime_t DEFAULT = 5 * SRS_UTIME_SECONDS;

SrsConfDirective* conf = root->get("threads");
if (!conf) {
Expand Down
16 changes: 14 additions & 2 deletions trunk/src/app/srs_app_log.cpp
Expand Up @@ -54,6 +54,9 @@ SrsFileLog::SrsFileLog()

log_data = new char[LOG_MAX_SIZE];
writer_ = NULL;

last_flush_time_ = srs_get_system_time();
interval_ = 0;
}

SrsFileLog::~SrsFileLog()
Expand All @@ -71,6 +74,7 @@ srs_error_t SrsFileLog::initialize()
filename_ = _srs_config->get_log_file();
level = srs_get_log_level(_srs_config->get_log_level());
utc = _srs_config->get_utc_time();
interval_ = _srs_config->srs_log_flush_interval();
}

if (!log_to_file_tank) {
Expand Down Expand Up @@ -202,11 +206,12 @@ void SrsFileLog::write_log(char *str_log, int size, int level)
// ensure the tail and EOF of string
// LOG_TAIL_SIZE for the TAIL char.
// 1 for the last char(0).
size = srs_min(LOG_MAX_SIZE - 1 - LOG_TAIL_SIZE, size);
size = srs_min(LOG_MAX_SIZE - 2 - LOG_TAIL_SIZE, size);

// add some to the end of char.
str_log[size++] = LOG_TAIL;

str_log[size] = 0;

// if not to file, to console and return.
if (!log_to_file_tank) {
// if is error msg, then print color msg.
Expand All @@ -230,5 +235,12 @@ void SrsFileLog::write_log(char *str_log, int size, int level)
if ((err = writer_->write(str_log, size, NULL)) != srs_success) {
srs_error_reset(err); // Ignore any error for log writing.
}

// Whether flush to thread-queue.
srs_utime_t diff = srs_get_system_time() - last_flush_time_;
if (diff >= interval_) {
last_flush_time_ = srs_get_system_time();
writer_->flush_co_queue();
}
}

5 changes: 5 additions & 0 deletions trunk/src/app/srs_app_log.hpp
Expand Up @@ -52,6 +52,11 @@ class SrsFileLog : public ISrsLog, public ISrsReloadHandler
char* log_data;
// Async file writer.
SrsAsyncFileWriter* writer_;
private:
// The interval to flush from coroutine-queue to thread-queue.
srs_utime_t interval_;
// Last flush coroutine-queue time, to calculate the timeout.
srs_utime_t last_flush_time_;
private:
// Defined in SrsLogLevel.
SrsLogLevel level;
Expand Down
60 changes: 44 additions & 16 deletions trunk/src/app/srs_app_threads.cpp
Expand Up @@ -33,6 +33,13 @@

using namespace std;

#include <srs_protocol_kbps.hpp>

SrsPps* _srs_thread_sync_10us = new SrsPps();
SrsPps* _srs_thread_sync_100us = new SrsPps();
SrsPps* _srs_thread_sync_1000us = new SrsPps();
SrsPps* _srs_thread_sync_plus = new SrsPps();

SrsThreadMutex::SrsThreadMutex()
{
// https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html
Expand Down Expand Up @@ -171,9 +178,19 @@ srs_error_t SrsThreadPool::run()
while (true) {
sleep(interval_ / SRS_UTIME_SECONDS);

static char buf[128];
string async_logs = _srs_async_log->description();
srs_trace("Thread #%d(%s): cycle threads=%d%s", entry_->num, entry_->label.c_str(), (int)threads_.size(),
async_logs.c_str());

string sync_desc;
_srs_thread_sync_10us->update(); _srs_thread_sync_100us->update();
_srs_thread_sync_1000us->update(); _srs_thread_sync_plus->update();
if (_srs_thread_sync_10us->r10s() || _srs_thread_sync_100us->r10s() || _srs_thread_sync_1000us->r10s() || _srs_thread_sync_plus->r10s()) {
snprintf(buf, sizeof(buf), ", sync=%d,%d,%d,%d", _srs_thread_sync_10us->r10s(), _srs_thread_sync_100us->r10s(), _srs_thread_sync_1000us->r10s(), _srs_thread_sync_plus->r10s());
sync_desc = buf;
}

srs_trace("Thread: cycle threads=%d%s%s", (int)threads_.size(),
async_logs.c_str(), sync_desc.c_str());
}

return err;
Expand Down Expand Up @@ -201,14 +218,12 @@ void* SrsThreadPool::start(void* arg)

SrsThreadPool* _srs_thread_pool = new SrsThreadPool();

SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p, srs_utime_t interval)
SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
{
filename_ = p;
writer_ = new SrsFileWriter();
queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
co_queue_ = new SrsCoroutineQueue<SrsSharedPtrMessage>();
interval_ = interval;
last_flush_time_ = srs_get_system_time();
}

// TODO: FIXME: Before free the writer, we must remove it from the manager.
Expand Down Expand Up @@ -251,15 +266,6 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)

co_queue_->push_back(msg);

// Whether flush to thread-queue.
if (srs_get_system_time() - last_flush_time_ >= interval_) {
last_flush_time_ = srs_get_system_time();

vector<SrsSharedPtrMessage*> flying;
co_queue_->swap(flying);
queue_->push_back(flying);
}

if (pnwrite) {
*pnwrite = count;
}
Expand Down Expand Up @@ -287,6 +293,28 @@ srs_error_t SrsAsyncFileWriter::writev(const iovec* iov, int iovcnt, ssize_t* pn
return err;
}

void SrsAsyncFileWriter::flush_co_queue()
{
srs_utime_t now = srs_update_system_time();

if (true) {
vector<SrsSharedPtrMessage*> flying;
co_queue_->swap(flying);
queue_->push_back(flying);
}

srs_utime_t elapsed = srs_update_system_time() - now;
if (elapsed <= 10) {
++_srs_thread_sync_10us->sugar;
} else if (elapsed <= 100) {
++_srs_thread_sync_100us->sugar;
} else if (elapsed <= 1000) {
++_srs_thread_sync_1000us->sugar;
} else {
++_srs_thread_sync_plus->sugar;
}
}

srs_error_t SrsAsyncFileWriter::flush()
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -362,7 +390,7 @@ srs_error_t SrsAsyncLogManager::create_writer(std::string filename, SrsAsyncFile
{
srs_error_t err = srs_success;

SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename, interval_);
SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename);
writers_.push_back(writer);

if ((err = writer->open()) != srs_success) {
Expand Down Expand Up @@ -404,7 +432,7 @@ std::string SrsAsyncLogManager::description()
}

static char buf[128];
snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d, coq=%d/%d",
snprintf(buf, sizeof(buf), ", logs=%d/%d/%d/%d/%d",
(int)writers_.size(), nn_logs, max_logs, nn_co_logs, max_co_logs);

return buf;
Expand Down
12 changes: 4 additions & 8 deletions trunk/src/app/srs_app_threads.hpp
Expand Up @@ -201,14 +201,10 @@ class SrsAsyncFileWriter : public ISrsWriter
// The thread-queue, to flush to disk by dedicated thread.
SrsThreadQueue<SrsSharedPtrMessage>* queue_;
private:
// The interval to flush from coroutine-queue to thread-queue.
srs_utime_t interval_;
// Last flush coroutine-queue time, to calculate the timeout.
srs_utime_t last_flush_time_;
// The coroutine-queue, to avoid requires lock for each log.
SrsCoroutineQueue<SrsSharedPtrMessage>* co_queue_;
private:
SrsAsyncFileWriter(std::string p, srs_utime_t interval);
SrsAsyncFileWriter(std::string p);
virtual ~SrsAsyncFileWriter();
public:
// Open file writer, in truncate mode.
Expand All @@ -222,7 +218,9 @@ class SrsAsyncFileWriter : public ISrsWriter
virtual srs_error_t write(void* buf, size_t count, ssize_t* pnwrite);
virtual srs_error_t writev(const iovec* iov, int iovcnt, ssize_t* pnwrite);
public:
// Flush by other thread.
// Flush coroutine-queue to thread-queue, avoid requiring lock for each message.
void flush_co_queue();
// Flush thread-queue to disk, generally by dedicated thread.
srs_error_t flush();
};

Expand All @@ -233,8 +231,6 @@ class SrsAsyncLogManager
private:
// The async flush interval.
srs_utime_t interval_;
// The number of logs to flush from coroutine-queue to thread-queue.
int flush_co_queue_;
private:
// The async reopen event.
bool reopen_;
Expand Down
6 changes: 3 additions & 3 deletions trunk/src/protocol/srs_service_log.cpp
Expand Up @@ -244,9 +244,9 @@ bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char
{
// clock time
timeval tv;
if (gettimeofday(&tv, NULL) == -1) {
return false;
}
srs_utime_t now = srs_update_system_time();
tv.tv_sec = now / SRS_UTIME_SECONDS;
tv.tv_usec = now % SRS_UTIME_SECONDS;

// to calendar time
struct tm* tm;
Expand Down

0 comments on commit 6718c38

Please sign in to comment.