Skip to content

Commit

Permalink
add Process\Pool::detach()
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed May 18, 2021
1 parent 8ce5041 commit 7423d14
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 5 deletions.
30 changes: 30 additions & 0 deletions examples/process_pool/detach.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php
use Swoole\Process;
use Swoole\Atomic;

$pool = new Process\Pool(2, SWOOLE_IPC_SOCKET);

$pool->on('WorkerStart', function (Process\Pool $pool, $workerId) {
echo("[Worker #{$workerId}] WorkerStart\n");
if ($workerId == 1) {

}
});

$pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
echo("[Worker #{$workerId}] WorkerStop\n");
});

$pool->on('Message', function ($pool, $msg) {
var_dump($msg);
$pool->detach();

while(1) {
sleep(1);
echo "pid=".posix_getpid()."\n";
};
});

$pool->listen('127.0.0.1', 8089);

$pool->start();
5 changes: 5 additions & 0 deletions examples/process_pool/send.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<?php
$fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr\n");
$msg = json_encode(['data' => 'hello', 'uid' => 1991]);
fwrite($fp, pack('N', strlen($msg)) . $msg);
sleep(1);
13 changes: 13 additions & 0 deletions ext-src/swoole_process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ static PHP_METHOD(swoole_process_pool, set);
static PHP_METHOD(swoole_process_pool, on);
static PHP_METHOD(swoole_process_pool, listen);
static PHP_METHOD(swoole_process_pool, write);
static PHP_METHOD(swoole_process_pool, detach);
static PHP_METHOD(swoole_process_pool, getProcess);
static PHP_METHOD(swoole_process_pool, start);
static PHP_METHOD(swoole_process_pool, shutdown);
Expand Down Expand Up @@ -174,6 +175,7 @@ static const zend_function_entry swoole_process_pool_methods[] =
PHP_ME(swoole_process_pool, getProcess, arginfo_swoole_process_pool_getProcess, ZEND_ACC_PUBLIC)
PHP_ME(swoole_process_pool, listen, arginfo_swoole_process_pool_listen, ZEND_ACC_PUBLIC)
PHP_ME(swoole_process_pool, write, arginfo_swoole_process_pool_write, ZEND_ACC_PUBLIC)
PHP_ME(swoole_process_pool, detach, arginfo_swoole_process_pool_void, ZEND_ACC_PUBLIC)
PHP_ME(swoole_process_pool, start, arginfo_swoole_process_pool_void, ZEND_ACC_PUBLIC)
PHP_ME(swoole_process_pool, shutdown, arginfo_swoole_process_pool_void, ZEND_ACC_PUBLIC)
PHP_FE_END
Expand Down Expand Up @@ -266,6 +268,9 @@ static void pool_signal_handler(int sig) {
current_pool->reloading = true;
current_pool->reload_init = false;
break;
case SIGIO:
current_pool->read_message = true;
break;
default:
break;
}
Expand Down Expand Up @@ -497,6 +502,7 @@ static PHP_METHOD(swoole_process_pool, start) {
ori_handlers[SIGTERM] = swSignal_set(SIGTERM, pool_signal_handler);
ori_handlers[SIGUSR1] = swSignal_set(SIGUSR1, pool_signal_handler);
ori_handlers[SIGUSR2] = swSignal_set(SIGUSR2, pool_signal_handler);
ori_handlers[SIGIO] = swSignal_set(SIGIO, pool_signal_handler);

if (pool->ipc_mode == SW_IPC_NONE || pp->enable_coroutine) {
if (pp->onWorkerStart == nullptr) {
Expand Down Expand Up @@ -542,6 +548,13 @@ static PHP_METHOD(swoole_process_pool, start) {

extern void php_swoole_process_set_worker(zval *zobject, Worker *worker);

static PHP_METHOD(swoole_process_pool, detach) {
if (current_pool == nullptr) {
RETURN_FALSE;
}
RETURN_BOOL(current_pool->detach());
}

static PHP_METHOD(swoole_process_pool, getProcess) {
long worker_id = -1;

Expand Down
12 changes: 12 additions & 0 deletions include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "swoole_lock.h"
#include "swoole_pipe.h"
#include "swoole_channel.h"
#include "swoole_msg_queue.h"

enum swWorker_status {
Expand Down Expand Up @@ -173,6 +174,7 @@ struct ProcessPool {
bool reloading;
bool running;
bool reload_init;
bool read_message;
bool started;
uint8_t dispatch_mode;
uint8_t ipc_mode;
Expand Down Expand Up @@ -234,6 +236,7 @@ struct ProcessPool {
Reactor *reactor;
MsgQueue *queue;
StreamInfo *stream_info_;
Channel *message_box = nullptr;

void *ptr;

Expand All @@ -257,9 +260,18 @@ struct ProcessPool {
return &(workers[worker_id - start_id]);
}

Worker *get_worker_by_pid(pid_t pid) {
auto iter = map_->find(pid);
if (iter == map_->end()) {
return nullptr;
}
return iter->second;
}

void set_max_request(uint32_t _max_request, uint32_t _max_request_grace);
int get_max_request();
int set_protocol(int task_protocol, uint32_t max_packet_size);
bool detach();
int wait();
int start();
void shutdown();
Expand Down
48 changes: 45 additions & 3 deletions src/os/process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ int ProcessPool::create(uint32_t _worker_num, key_t _msgqueue_key, swIPC_type _i
return SW_ERR;
}

message_box = Channel::make(65536, sizeof(WorkerStopMessage), SW_CHAN_LOCK | SW_CHAN_SHM);
if (message_box == nullptr) {
return SW_ERR;
}

if (_ipc_mode == SW_IPC_MSGQUEUE) {
use_msgqueue = 1;
msgqueue_key = _msgqueue_key;
Expand Down Expand Up @@ -605,6 +610,21 @@ int ProcessPool_add_worker(ProcessPool *pool, Worker *worker) {
return SW_OK;
}

bool ProcessPool::detach() {
WorkerStopMessage msg;
msg.pid = getpid();
msg.worker_id = SwooleG.process_id;

if (message_box && message_box->push(&msg, sizeof(msg)) < 0) {
return false;
}
if (swoole_kill(master_pid, SIGIO) < 0) {
return false;
}
running = false;
return true;
}

int ProcessPool::wait() {
pid_t new_pid, reload_worker_pid = 0;
int ret;
Expand All @@ -622,6 +642,25 @@ int ProcessPool::wait() {
SwooleG.signal_alarm = false;
SwooleTG.timer->select();
}
if (read_message) {
WorkerStopMessage msg;
while (message_box->pop(&msg, sizeof(msg)) > 0) {
if (!running) {
continue;
}
Worker *exit_worker = get_worker_by_pid(msg.pid);
if (exit_worker == nullptr) {
continue;
}
pid_t new_pid = spawn(exit_worker);
if (new_pid < 0) {
swSysWarn("Fork worker process failed");
return SW_ERR;
}
map_->erase(msg.pid);
}
read_message = false;
}
if (exit_status.get_pid() < 0) {
if (!running) {
break;
Expand All @@ -645,8 +684,8 @@ int ProcessPool::wait() {
}

if (running) {
auto iter = map_->find(exit_status.get_pid());
if (iter == map_->end()) {
Worker *exit_worker = get_worker_by_pid(exit_status.get_pid());
if (exit_worker == nullptr) {
if (onWorkerNotFound) {
onWorkerNotFound(this, exit_status);
} else {
Expand All @@ -655,7 +694,6 @@ int ProcessPool::wait() {
continue;
}

Worker *exit_worker = iter->second;
if (!exit_status.is_normal_exit()) {
swWarn("worker#%d abnormal exit, status=%d, signal=%d"
"%s",
Expand Down Expand Up @@ -732,6 +770,10 @@ void ProcessPool::destroy() {
delete map_;
}

if (message_box) {
message_box->destroy();
}

sw_mem_pool()->free(workers);
}

Expand Down
4 changes: 2 additions & 2 deletions src/reactor/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ int Reactor::_write(Reactor *reactor, Socket *socket, const void *buf, size_t n)
send_bytes = socket->send(buf, n, 0);
return send_bytes;
};
auto append_fn = [&send_bytes, socket, buf, n](Buffer *buffer) {
auto append_fn = [&send_bytes, buf, n](Buffer *buffer) {
ssize_t offset = send_bytes > 0 ? send_bytes : 0;
buffer->append((const char *) buf + offset, n - offset);
};
Expand All @@ -325,7 +325,7 @@ int Reactor::_writev(Reactor *reactor, network::Socket *socket, const iovec *iov
send_bytes = socket->writev(iov, iovcnt);
return send_bytes;
};
auto append_fn = [&send_bytes, socket, iov, iovcnt](Buffer *buffer) {
auto append_fn = [&send_bytes, iov, iovcnt](Buffer *buffer) {
ssize_t offset = send_bytes > 0 ? send_bytes : 0;
buffer->append(iov, iovcnt, offset);
};
Expand Down

0 comments on commit 7423d14

Please sign in to comment.