Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【libuv 源码学习笔记】信号 #5

Open
xiaoxiaojx opened this issue Jun 8, 2021 · 0 comments
Open

【libuv 源码学习笔记】信号 #5

xiaoxiaojx opened this issue Jun 8, 2021 · 0 comments
Labels
libuv libuv is a multi-platform support library with a focus on asynchronous I/O. It was primarily develop

Comments

@xiaoxiaojx
Copy link
Owner

xiaoxiaojx commented Jun 8, 2021

image

Table of Contents

1. 前言

开始写一些博客主要是在看完代码后再温故总结一遍, 也是为了后面回头也能查阅。本系列会从官网的例子出发, 尽可能以链路追踪的方式记录其中源码核心模块的实现, 本篇例子来源

涉及的知识点

2. 例子 signal/main.c

创建了两个子线程, 而 linux 提供的 sigaction 函数一个 signum 只能有一个监听函数, 那么多进程多线程如何做到只设置一次通知所有监听函数了?

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <uv.h>

uv_loop_t* create_loop()
{
    uv_loop_t *loop = malloc(sizeof(uv_loop_t));
    if (loop) {
      uv_loop_init(loop);
    }
    return loop;
}

void signal_handler(uv_signal_t *handle, int signum)
{
    printf("Signal received: %d\n", signum);
    uv_signal_stop(handle);
}

// two signal handlers in one loop
void thread1_worker(void *userp)
{
    uv_loop_t *loop1 = create_loop();

    uv_signal_t sig1a, sig1b;
    uv_signal_init(loop1, &sig1a);
    uv_signal_start(&sig1a, signal_handler, SIGUSR1);

    uv_signal_init(loop1, &sig1b);
    uv_signal_start(&sig1b, signal_handler, SIGUSR1);

    uv_run(loop1, UV_RUN_DEFAULT);
}

// two signal handlers, each in its own loop
void thread2_worker(void *userp)
{
    uv_loop_t *loop2 = create_loop();
    uv_loop_t *loop3 = create_loop();

    uv_signal_t sig2;
    uv_signal_init(loop2, &sig2);
    uv_signal_start(&sig2, signal_handler, SIGUSR1);

    uv_signal_t sig3;
    uv_signal_init(loop3, &sig3);
    uv_signal_start(&sig3, signal_handler, SIGUSR1);

    while (uv_run(loop2, UV_RUN_NOWAIT) || uv_run(loop3, UV_RUN_NOWAIT)) {
    }
}

int main()
{
    printf("PID %d\n", getpid());

    uv_thread_t thread1, thread2;

    uv_thread_create(&thread1, thread1_worker, 0);
    uv_thread_create(&thread2, thread2_worker, 0);

    uv_thread_join(&thread1);
    uv_thread_join(&thread2);
    return 0;
}

关于该例子中的 SIGUSR1 信号, 为用户自定义信号1

  • 比如向进程 12345 发送信号 10
$ kill -10 12345
  • 那么向进程 12345 发送自定义信号1 则可以通过下面的命令
$ kill -SIGUSR1 12345

2.1. uv_signal_init

thread1_worker > uv_signal_init

对 loop 和 handle 进行一些数据初始化操作, 主要调用了 uv__signal_loop_once_init 函数。

int uv_signal_init(uv_loop_t* loop, uv_signal_t* handle) {
  int err;

  err = uv__signal_loop_once_init(loop);
  if (err)
    return err;

  uv__handle_init(loop, (uv_handle_t*) handle, UV_SIGNAL);
  handle->signum = 0;
  handle->caught_signals = 0;
  handle->dispatched_signals = 0;

  return 0;
}

2.2. uv__signal_loop_once_init

thread1_worker > uv_signal_init > uv__signal_loop_once_init

  1. 如果已经有通信的 fd 则直接返回
  2. uv__make_pipe 函数在 【libuv 源码学习笔记】子进程与ipc 就分析过, 函数里面主要是调用 pipe2 函数

pipe2: 创建一个管道,一个单向的数据通道,可以 用于进程间通信。数组 pipefd 用于 返回两个指向管道末端的文件描述符。 pipefd[0] 指的是管道的读取端。 pipefd[1] 指的是 到管道的写端。写入到写端的数据 管道由内核缓冲,直到从 read 中读取 管道的末端。

  1. 调用 uv__io_init 初始化一个 i/o 观察者, 其观察者的回调函数为 uv__signal_event, 需要观察的 fd 为上面 pipe2 拿到读端的 fd。i/o 相关实现可参考 【libuv 源码学习笔记】线程池与i/o
  2. 调用 uv__io_start 注册刚才初始化完成的 i/o 观察者。
static int uv__signal_loop_once_init(uv_loop_t* loop) {
  int err;

  /* Return if already initialized. */
  if (loop->signal_pipefd[0] != -1)
    return 0;

  err = uv__make_pipe(loop->signal_pipefd, UV_NONBLOCK_PIPE);
  if (err)
    return err;

  uv__io_init(&loop->signal_io_watcher,
              uv__signal_event,
              loop->signal_pipefd[0]);
  uv__io_start(loop, &loop->signal_io_watcher, POLLIN);

  return 0;
}

2.3. uv_signal_start

thread1_worker > uv_signal_start

uv_signal_start 函数里面主要是调用了 uv_signal_start 方法, libuv 中有大量相似度极高的函数名 ...

  1. 如果发现该 handle 的 signum 已经注册则直接返回
  2. 调用 uv__signal_block_and_lock 就行类似互斥锁的锁定
  3. 调用 uv__signal_first_handle 函数, 如果该 signum 已经设置了监听函数则不再调用 uv__signal_register_handler 函数
  4. 调用 uv__signal_register_handler 函数给 signum 注册监听函数
  5. 通过 RB_INSERT 把该 handle 加入到树中。
  6. 调用 uv__signal_unlock_and_unblock 进行解锁, 即会 write 一次数据, 使其他等待的线程能够从 uv__signal_block_and_lock 函数往下运行。
static int uv__signal_start(uv_signal_t* handle,
                            uv_signal_cb signal_cb,
                            int signum,
                            int oneshot) {
 ...

  if (signum == handle->signum) {
    handle->signal_cb = signal_cb;
    return 0;
  }

  /* If the signal handler was already active, stop it first. */
  if (handle->signum != 0) {
    uv__signal_stop(handle);
  }

  uv__signal_block_and_lock(&saved_sigmask);

  first_handle = uv__signal_first_handle(signum);
  if (first_handle == NULL ||
      (!oneshot && (first_handle->flags & UV_SIGNAL_ONE_SHOT))) {
    err = uv__signal_register_handler(signum, oneshot);
    if (err) {
      /* Registering the signal handler failed. Must be an invalid signal. */
      uv__signal_unlock_and_unblock(&saved_sigmask);
      return err;
    }
  }

  handle->signum = signum;
  if (oneshot)
    handle->flags |= UV_SIGNAL_ONE_SHOT;

  RB_INSERT(uv__signal_tree_s, &uv__signal_tree, handle);

  uv__signal_unlock_and_unblock(&saved_sigmask);

  handle->signal_cb = signal_cb;
  uv__handle_start(handle);

  return 0;
}

2.4. uv__signal_block_and_lock

thread1_worker > uv_signal_start > uv__signal_block_and_lock

  • sigfillset: 该函数的作用是将信号集初始化为空。
  • pthread_sigmask: 在多线程的程序里,希望只在主线程中处理信号,可以使用该函数。每个线程均有自己的信号屏蔽集(信号掩码),可以使用pthread_sigmask函数来屏蔽某个线程对某些信号的响应处理,仅留下需要处理该信号的线程来处理指定的信号。

通过 pthread_sigmask 的例子可以看见主要是对信号集进行了初始化的操作, 然后调用了 uv__signal_lock 函数。

//pthread_sigmask 的例子

sigemptyset(&set);
sigaddset(&set, SIGQUIT);
sigaddset(&set, SIGUSR1);
s = pthread_sigmask(SIG_BLOCK, &set, NULL);
static void uv__signal_block_and_lock(sigset_t* saved_sigmask) {
  sigset_t new_mask;

  if (sigfillset(&new_mask))
    abort();

  /* to shut up valgrind */
  sigemptyset(saved_sigmask);
  if (pthread_sigmask(SIG_SETMASK, &new_mask, saved_sigmask))
    abort();

  if (uv__signal_lock())
    abort();
}

2.5. uv__signal_lock

thread1_worker > uv_signal_start > uv__signal_block_and_lock > uv__signal_lock

通过 read 读取 uv__signal_lock_pipefd[0], 当出现 EINTR 出错时, 就会尝试轮询重试。EINTR 错误一般出现在当正在进行系统调用时, 此时发送了一个 signal。

如果在系统调用正在进行时发生信号,许多系统调用将报告 EINTR 错误代码。实际上没有发生错误,只是因为系统无法自动恢复系统调用而以这种方式报告。这种编码模式只是在发生这种情况时重试系统调用,以忽略中断。

static int uv__signal_lock(void) {
  int r;
  char data;

  do {
    r = read(uv__signal_lock_pipefd[0], &data, sizeof data);
  } while (r < 0 && errno == EINTR);

  return (r < 0) ? -1 : 0;
}

那么何时 read 到数据让程序继续往下运行了 ?

此时感觉头绪有点断了, 那么从一开始在理一下, 是不是忽略了什么细节。最后在 create_loop > uv_loop_init > uv__signal_global_once_init > uv__signal_global_init 函数中找到了 write 数据的地方。

uv__signal_global_init 分析

函数里面调用了如果 uv__signal_lock_pipefd 未设置, 则调用 pthread_atfork 函数

pthread_atfork 前两个参数是调用 fork 函数产生子进程时的before, after 父进程里面会运行的勾子函数, 第三个参数是子进程会运行的勾子函数。

原来是创建子进程时会调用 uv__signal_global_reinit 一次, 本例子是创建了线程故不会进入这个场景, 最后只运行了一次 uv__signal_global_reinit 函数。

static void uv__signal_global_init(void) {
  if (uv__signal_lock_pipefd[0] == -1)
    // https://man7.org/linux/man-pages/man3/pthread_atfork.3.html
    if (pthread_atfork(NULL, NULL, &uv__signal_global_reinit))
      abort();

  uv__signal_global_reinit();
}

2.6. uv__signal_global_reinit

create_loop > uv_loop_init > uv__signal_global_once_init > uv__signal_global_init > uv__signal_global_reinit

原来是一个主线程里面会调用一次 uv__signal_global_reinit 函数, 去通过 uv__make_pipe 创建一个通信的管道, 并且最后会调用 uv__signal_unlock 去 write 一次数据。 这样在上面说到的当有一个线程进入 uv__signal_lock 逻辑时就会 read 到数据, 程序继续往下运行, 其他线程则会继续陷入等待, 达到互斥锁的目的。有点没想明白不直接使用互斥锁的原因 ...

当pthread_mutex_lock()返回时,该互斥锁已被锁定。线程调用该函数让互斥锁上锁,如果该互斥锁已被另一个线程锁定和拥有,则调用该线程将阻塞,直到该互斥锁变为可用为止。

static void uv__signal_global_reinit(void) {
  uv__signal_cleanup();

  if (uv__make_pipe(uv__signal_lock_pipefd, 0))
    abort();

  if (uv__signal_unlock())
    abort();
}

static int uv__signal_unlock(void) {
  int r;
  char data = 42;

  do {
    r = write(uv__signal_lock_pipefd[1], &data, sizeof data);
  } while (r < 0 && errno == EINTR);

  return (r < 0) ? -1 : 0;
}

2.7. uv__signal_first_handle

thread1_worker > uv_signal_start > uv__signal_first_handle

回到主线, 通过 RB_NFIND 查找该 signum 是否已经设置监听函数, 主要是确保一个 signum 只有一个监听函数。其主要原因是上面说的 sigaction 只能给一个 signum 绑定一个监听函数。

static uv_signal_t* uv__signal_first_handle(int signum) {
  /* This function must be called with the signal lock held. */
  uv_signal_t lookup;
  uv_signal_t* handle;

  lookup.signum = signum;
  lookup.flags = 0;
  lookup.loop = NULL;

  handle = RB_NFIND(uv__signal_tree_s, &uv__signal_tree, &lookup);

  if (handle != NULL && handle->signum == signum)
    return handle;

  return NULL;
}

2.8. RB_NFIND

thread1_worker > uv_signal_start > uv__signal_first_handle > RB_NFIND

和 QUEUE 一样都是通过一组宏定义实现的, 代码在 deps/uv/include/uv/tree.h 文件中。

在这里 signum 都是数字形式, 通过红黑树结构能够高效的查找于遍历。

2.9. uv__signal_register_handler

thread1_worker > uv_signal_start > uv__signal_register_handler

设置该 signum 的信号处理函数为 uv__signal_handler

sa_flags:用来设置信号处理的其他相关操作,下列的数值可用。可用OR 运算(|)组合

  • A_NOCLDSTOP:如果参数signum为SIGCHLD,则当子进程暂停时并不会通知父进程
  • SA_ONESHOT/SA_RESETHAND:当调用新的信号处理函数前,将此信号处理方式改为系统预设的方式
  • SA_RESTART:被信号中断的系统调用会自行重启
  • SA_NOMASK/SA_NODEFER:在处理此信号未结束前不理会此信号的再次到来
static int uv__signal_register_handler(int signum, int oneshot) {
  /* When this function is called, the signal lock must be held. */
  struct sigaction sa;

  /* XXX use a separate signal stack? */
  memset(&sa, 0, sizeof(sa));
  if (sigfillset(&sa.sa_mask))
    abort();
  sa.sa_handler = uv__signal_handler;
  sa.sa_flags = SA_RESTART;
  if (oneshot)
    sa.sa_flags |= SA_RESETHAND;

  /* XXX save old action so we can restore it later on? */
  if (sigaction(signum, &sa, NULL))
    return UV__ERR(errno);

  return 0;
}

2.10. uv__signal_handler

thread1_worker > uv_signal_start > uv__signal_register_handler > uv__signal_handler

作为唯一的信号处理函数, 让我们来看看 uv__signal_handler 的实现

  1. 通过 RB_NEXT 遍历拿出之前插入属性值 signum 等于当前接受到的信号 signum 的 handle。
  2. 在该 handle 的通信的 fd 写端写入数据。
  3. 剩下的就该知道发生啥事了, 在事件循环阶段五 Poll for I/O 阶段, epoll 等待写入事件成功后, 通知到上面通过 uv__io_init 设置的 i/o 观察者, 调用 i/o 观察者的回调函数, 即该例子的 uv__signal_event 函数。
static void uv__signal_handler(int signum) {
  ...

  for (handle = uv__signal_first_handle(signum);
       handle != NULL && handle->signum == signum;
       handle = RB_NEXT(uv__signal_tree_s, &uv__signal_tree, handle)) {
    int r;

    msg.signum = signum;
    msg.handle = handle;

    /* write() should be atomic for small data chunks, so the entire message
     * should be written at once. In theory the pipe could become full, in
     * which case the user is out of luck.
     */
    do {
      r = write(handle->loop->signal_pipefd[1], &msg, sizeof msg);
    } while (r == -1 && errno == EINTR);

    assert(r == sizeof msg ||
           (r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)));

    if (r != -1)
      handle->caught_signals++;
  }

  uv__signal_unlock();
  errno = saved_errno;
}

2.11. uv__signal_event

thread1_worker > uv_signal_init > uv__signal_loop_once_init > uv__signal_event

信号 i/o 设置的回调函数。

  1. 循环读取所有写入的消息, 可能有多条消息。
  2. 如果该消息的 signum 是需要监听的, 则调用 handle->signal_cb 回调函数。
static void uv__signal_event(uv_loop_t* loop,
                             uv__io_t* w,
                             unsigned int events) {
  uv__signal_msg_t* msg;
  uv_signal_t* handle;
  char buf[sizeof(uv__signal_msg_t) * 32];
  size_t bytes, end, i;
  int r;

  bytes = 0;
  end = 0;

  do {
    r = read(loop->signal_pipefd[0], buf + bytes, sizeof(buf) - bytes);

    if (r == -1 && errno == EINTR)
      continue;

    if (r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
      /* If there are bytes in the buffer already (which really is extremely
       * unlikely if possible at all) we can't exit the function here. We'll
       * spin until more bytes are read instead.
       */
      if (bytes > 0)
        continue;

      /* Otherwise, there was nothing there. */
      return;
    }

    /* Other errors really should never happen. */
    if (r == -1)
      abort();

    bytes += r;

    /* `end` is rounded down to a multiple of sizeof(uv__signal_msg_t). */
    end = (bytes / sizeof(uv__signal_msg_t)) * sizeof(uv__signal_msg_t);

    for (i = 0; i < end; i += sizeof(uv__signal_msg_t)) {
      msg = (uv__signal_msg_t*) (buf + i);
      handle = msg->handle;

      if (msg->signum == handle->signum) {
        assert(!(handle->flags & UV_HANDLE_CLOSING));
        handle->signal_cb(handle, handle->signum);
      }

      handle->dispatched_signals++;

      if (handle->flags & UV_SIGNAL_ONE_SHOT)
        uv__signal_stop(handle);
    }

    bytes -= end;

    /* If there are any "partial" messages left, move them to the start of the
     * the buffer, and spin. This should not happen.
     */
    if (bytes) {
      memmove(buf, buf + end, bytes);
      continue;
    }
  } while (end == sizeof buf);
}

3. 小结

只需在第一个调用 uv__signal_start 函数的时候注册一个信号处理函数, 当收到信号时, 该函数会遍历红黑树中所有关注该 signum 的 handle, 然后向该 handle 通过 pipe2 申请的通信 fd 的写端写入数据, 事件循环阶段被 epoll 捕获通知到该 handle 的 i/o 观察者, 最后调用观察者的回调, 达到通知所有监听函数的目的。

@xiaoxiaojx xiaoxiaojx added the libuv libuv is a multi-platform support library with a focus on asynchronous I/O. It was primarily develop label Jun 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
libuv libuv is a multi-platform support library with a focus on asynchronous I/O. It was primarily develop
Projects
None yet
Development

No branches or pull requests

1 participant