Skip to content

Commit

Permalink
Threads-Hybrid: Extract pipe and pair to communicate between threads
Browse files Browse the repository at this point in the history
1. Pipe can be open by any threads, because it's only os FDs.
2. If pipe is open read/write, it's associated by ST, so we MUST free it by the same thread.
3. If open pipe in one thread, it's ok to free it directly, without close pipe.
4. If open read in a thread, then open write in another thread, user MUST close it correctly.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 40e59ef commit 281350d
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 33 deletions.
66 changes: 36 additions & 30 deletions trunk/src/app/srs_app_server.cpp
Expand Up @@ -404,39 +404,42 @@ SrsSignalManager* SrsSignalManager::instance = NULL;
SrsSignalManager::SrsSignalManager(SrsServer* s)
{
SrsSignalManager::instance = this;


pipe_ = new SrsThreadPipePair();

server = s;
sig_pipe[0] = sig_pipe[1] = -1;
trd = new SrsSTCoroutine("signal", this, _srs_context->get_id());
signal_read_stfd = NULL;
}

SrsSignalManager::~SrsSignalManager()
{
srs_close_stfd(signal_read_stfd);

if (sig_pipe[0] > 0) {
::close(sig_pipe[0]);
}
if (sig_pipe[1] > 0) {
::close(sig_pipe[1]);
}

srs_freep(trd);

// Note that it's optional, because the read/write pair is in the same thread.
pipe_->close_read();
pipe_->close_write();

// If in the same thread, we could directly free the pipe, which will close all FDs.
srs_freep(pipe_);
}

srs_error_t SrsSignalManager::initialize()
{
/* Create signal pipe */
if (pipe(sig_pipe) < 0) {
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe");
srs_error_t err = srs_success;

if ((err = pipe_->initialize()) != srs_success) {
return srs_error_wrap(err, "init pipe");
}
if ((signal_read_stfd = srs_netfd_open(sig_pipe[0])) == NULL) {
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "open pipe");

if ((err = pipe_->open_read()) != srs_success) {
return srs_error_wrap(err, "init read pipe");
}

return srs_success;

if ((err = pipe_->open_write()) != srs_success) {
return srs_error_wrap(err, "init write pipe");
}

return err;
}

srs_error_t SrsSignalManager::start()
Expand Down Expand Up @@ -496,11 +499,12 @@ srs_error_t SrsSignalManager::cycle()
}

int signo;
// Read the next signal from the pipe
if ((err = pipe_->read(&signo, sizeof(int), NULL)) != srs_success) {
srs_freep(err); // Ignore any error.
}

/* Read the next signal from the pipe */
srs_read(signal_read_stfd, &signo, sizeof(int), SRS_UTIME_NO_TIMEOUT);

/* Process signal synchronously */
// Process signal synchronously
server->on_signal(signo);
}

Expand All @@ -511,13 +515,15 @@ void SrsSignalManager::sig_catcher(int signo)
{
int err;

/* Save errno to restore it after the write() */
// Save errno to restore it after the write()
err = errno;

/* write() is reentrant/async-safe */
int fd = SrsSignalManager::instance->sig_pipe[1];
write(fd, &signo, sizeof(int));


// write() is reentrant/async-safe
srs_error_t r0 = SrsSignalManager::instance->pipe_->write(&signo, sizeof(int), NULL);
if (r0 != srs_success) {
srs_freep(r0); // Ignore any error.
}

errno = err;
}

Expand Down
5 changes: 2 additions & 3 deletions trunk/src/app/srs_app_server.hpp
Expand Up @@ -57,7 +57,7 @@ class SrsAppCasterFlv;
class SrsRtspCaster;
class SrsResourceManager;
class SrsGb28181Caster;

class SrsThreadPipePair;

// The listener type for server to identify the connection,
// that is, use different type to process the connection.
Expand Down Expand Up @@ -215,8 +215,7 @@ class SrsSignalManager : public ISrsCoroutineHandler
private:
// Per-process pipe which is used as a signal queue.
// Up to PIPE_BUF/sizeof(int) signals can be queued up.
int sig_pipe[2];
srs_netfd_t signal_read_stfd;
SrsThreadPipePair* pipe_;
private:
SrsServer* server;
SrsCoroutine* trd;
Expand Down
137 changes: 137 additions & 0 deletions trunk/src/app/srs_app_threads.cpp
Expand Up @@ -83,6 +83,143 @@ uint64_t srs_covert_cpuset(cpu_set_t v)
#endif
}

SrsPipe::SrsPipe()
{
pipes_[0] = pipes_[1] = -1;
}

SrsPipe::~SrsPipe()
{
// Close the FDs because we might not open it as stfd.
if (pipes_[0] > 0) {
::close(pipes_[0]);
}
if (pipes_[1] > 0) {
::close(pipes_[1]);
}
}

srs_error_t SrsPipe::initialize()
{
srs_error_t err = srs_success;

if (pipe(pipes_) < 0) {
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe");
}

return err;
}

int SrsPipe::read_fd()
{
return pipes_[0];
}

int SrsPipe::write_fd()
{
return pipes_[1];
}

SrsThreadPipe::SrsThreadPipe()
{
stfd_ = NULL;
}

SrsThreadPipe::~SrsThreadPipe()
{
srs_close_stfd(stfd_);
}

srs_error_t SrsThreadPipe::initialize(int fd)
{
srs_error_t err = srs_success;

if ((stfd_ = srs_netfd_open(fd)) == NULL) {
return srs_error_new(ERROR_PIPE_OPEN, "open pipe");
}

return err;
}

srs_error_t SrsThreadPipe::read(void* buf, size_t size, ssize_t* nread)
{
ssize_t nn = srs_read(stfd_, buf, size, SRS_UTIME_NO_TIMEOUT);

if (nread) {
*nread = nn;
}

if (nn < 0) {
return srs_error_new(ERROR_PIPE_READ, "read");
}

return srs_success;
}

srs_error_t SrsThreadPipe::write(void* buf, size_t size, ssize_t* nwrite)
{
ssize_t nn = srs_write(stfd_, buf, size, SRS_UTIME_NO_TIMEOUT);

if (nwrite) {
*nwrite = nn;
}

if (nn < 0) {
return srs_error_new(ERROR_PIPE_WRITE, "write");
}

return srs_success;
}

SrsThreadPipePair::SrsThreadPipePair()
{
pipe_ = new SrsPipe();
rpipe_ = new SrsThreadPipe();
wpipe_ = new SrsThreadPipe();
}

SrsThreadPipePair::~SrsThreadPipePair()
{
close_read();
close_write();
srs_freep(pipe_);
}

srs_error_t SrsThreadPipePair::initialize()
{
return pipe_->initialize();
}

srs_error_t SrsThreadPipePair::open_read()
{
return rpipe_->initialize(pipe_->read_fd());
}

srs_error_t SrsThreadPipePair::open_write()
{
return wpipe_->initialize(pipe_->write_fd());
}

void SrsThreadPipePair::close_read()
{
srs_freep(rpipe_);
}

void SrsThreadPipePair::close_write()
{
srs_freep(wpipe_);
}

srs_error_t SrsThreadPipePair::read(void* buf, size_t size, ssize_t* nread)
{
return rpipe_->read(buf, size, nread);
}

srs_error_t SrsThreadPipePair::write(void* buf, size_t size, ssize_t* nwrite)
{
return wpipe_->write(buf, size, nwrite);
}

SrsThreadMutex::SrsThreadMutex()
{
// https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html
Expand Down
68 changes: 68 additions & 0 deletions trunk/src/app/srs_app_threads.hpp
Expand Up @@ -44,6 +44,74 @@ class SrsAsyncSRTPPacket;
class SrsSecurityTransport;
class SrsProcSelfStat;

// The pipe wraps the os pipes(fds).
class SrsPipe
{
private:
// The max buffer size of pipe is PIPE_BUF, so if we used to transmit signals(int),
// up to PIPE_BUF/sizeof(int) signals can be queued up.
// @see https://man7.org/linux/man-pages/man2/pipe.2.html
int pipes_[2];
public:
SrsPipe();
virtual ~SrsPipe();
public:
srs_error_t initialize();
public:
int read_fd();
int write_fd();
};

// The pipe to communicate between thread-local ST of threads.
class SrsThreadPipe
{
private:
srs_netfd_t stfd_;
public:
SrsThreadPipe();
virtual ~SrsThreadPipe();
public:
// Open fd by ST, should be free by the same thread.
srs_error_t initialize(int fd);
public:
// Note that the pipe is unidirectional data channel, so only one of
// read/write is available.
srs_error_t read(void* buf, size_t size, ssize_t* nread);
srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
};

// A thread pipe pair, to communicate between threads.
// @remark If thread A open read, then it MUST close the read.
class SrsThreadPipePair
{
private:
// Per-process pipe which is used as a signal queue.
// Up to PIPE_BUF/sizeof(int) signals can be queued up.
SrsPipe* pipe_;
SrsThreadPipe* rpipe_;
SrsThreadPipe* wpipe_;
public:
SrsThreadPipePair();
virtual ~SrsThreadPipePair();
public:
// It's ok to initialize pipe in another threads.
srs_error_t initialize();
public:
// It's ok to open read/write in one or two threads.
srs_error_t open_read();
srs_error_t open_write();
public:
// For multiple-threading, if a thread open the pipe, it MUST close it, never close it by
// another thread which has not open it.
// If pair(read/write) alive in one thread, user can directly free the pair, without closing
// the read/write, because it's in the same thread.
void close_read();
void close_write();
public:
srs_error_t read(void* buf, size_t size, ssize_t* nread);
srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
};

// The thread mutex wrapper, without error.
class SrsThreadMutex
{
Expand Down
3 changes: 3 additions & 0 deletions trunk/src/kernel/srs_kernel_error.hpp
Expand Up @@ -120,6 +120,9 @@
#define ERROR_SOCKET_ACCEPT 1081
#define ERROR_THREAD_CREATE 1082
#define ERROR_SYSTEM_LOGFILE 1083
#define ERROR_PIPE_OPEN 1084
#define ERROR_PIPE_READ 1085
#define ERROR_PIPE_WRITE 1086

///////////////////////////////////////////////////////
// RTMP protocol error.
Expand Down
5 changes: 5 additions & 0 deletions trunk/src/protocol/srs_service_st.cpp
Expand Up @@ -457,6 +457,11 @@ ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout)
return st_read((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout);
}

ssize_t srs_write(srs_netfd_t stfd, const void *buf, size_t nbyte, srs_utime_t timeout)
{
return st_write((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout);
}

bool srs_is_never_timeout(srs_utime_t tm)
{
return tm == SRS_UTIME_NO_TIMEOUT;
Expand Down
1 change: 1 addition & 0 deletions trunk/src/protocol/srs_service_st.hpp
Expand Up @@ -102,6 +102,7 @@ extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, sr
extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);

extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout);
extern ssize_t srs_write(srs_netfd_t stfd, const void *buf, size_t nbyte, srs_utime_t timeout);

extern bool srs_is_never_timeout(srs_utime_t tm);

Expand Down

0 comments on commit 281350d

Please sign in to comment.