Skip to content

Commit

Permalink
add worker_dispatch_count
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Feb 9, 2019
1 parent 186c660 commit a353808
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 8 deletions.
2 changes: 1 addition & 1 deletion include/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ void swReactorThread_set_protocol(swServer *serv, swReactor *reactor);
void swReactorThread_free(swServer *serv);
int swReactorThread_close(swReactor *reactor, int fd);
int swReactorThread_dispatch(swConnection *conn, char *data, uint32_t length);
int swReactorThread_send2worker(swServer *serv, void *data, int len, uint16_t target_worker_id);
int swReactorThread_send2worker(swServer *serv, swWorker *worker, void *data, int len);

int swReactorProcess_create(swServer *serv);
int swReactorProcess_start(swServer *serv);
Expand Down
1 change: 1 addition & 0 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,7 @@ struct _swWorker
time_t start_time;
time_t request_time;

long dispatch_count;
long request_count;

/**
Expand Down
20 changes: 17 additions & 3 deletions src/server/process.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,25 @@ static int swFactoryProcess_dispatch(swFactory *factory, swSendData *task)
task->info.from_fd = conn->from_fd;
}

swWorker *worker = swServer_get_worker(serv, target_worker_id);

//without data
if (task->data == NULL)
{
task->info.flags = 0;
return swReactorThread_send2worker(serv, &task->info, sizeof(task->info), target_worker_id);
return swReactorThread_send2worker(serv, worker, &task->info, sizeof(task->info));
}

switch (task->info.type)
{
case SW_EVENT_TCP6:
case SW_EVENT_TCP:
case SW_EVENT_UNIX_STREAM:
case SW_EVENT_UDP:
case SW_EVENT_UDP6:
case SW_EVENT_UNIX_DGRAM:
worker->dispatch_count++;
break;
}

uint32_t send_n = task->length;
Expand All @@ -183,7 +197,7 @@ static int swFactoryProcess_dispatch(swFactory *factory, swSendData *task)
buf.info.flags = 0;
buf.info.len = send_n;
memcpy(buf.data, data, buf.info.len);
return swReactorThread_send2worker(serv, &buf, sizeof(buf.info) + buf.info.len, target_worker_id);
return swReactorThread_send2worker(serv, worker, &buf, sizeof(buf.info) + buf.info.len);
}

buf.info.flags = SW_EVENT_DATA_CHUNK;
Expand All @@ -207,7 +221,7 @@ static int swFactoryProcess_dispatch(swFactory *factory, swSendData *task)

swTrace("dispatch, type=%d|len=%d", buf.info.type, buf.info.len);

if (swReactorThread_send2worker(serv, &buf, sizeof(buf.info) + buf.info.len, target_worker_id) < 0)
if (swReactorThread_send2worker(serv, worker, &buf, sizeof(buf.info) + buf.info.len) < 0)
{
return SW_ERR;
}
Expand Down
5 changes: 1 addition & 4 deletions src/server/reactor_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,9 @@ static int swReactorThread_onPipeReceive(swReactor *reactor, swEvent *ev)
return SW_OK;
}

int swReactorThread_send2worker(swServer *serv, void *data, int len, uint16_t target_worker_id)
int swReactorThread_send2worker(swServer *serv, swWorker *worker, void *data, int len)
{
assert(target_worker_id < serv->worker_num);

int ret = -1;
swWorker *worker = &(serv->workers[target_worker_id]);

//reactor thread
if (SwooleTG.type == SW_THREAD_REACTOR)
Expand Down
1 change: 1 addition & 0 deletions swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3277,6 +3277,7 @@ static PHP_METHOD(swoole_server, stats)
if (SwooleWG.worker)
{
add_assoc_long_ex(return_value, ZEND_STRL("worker_request_count"), SwooleWG.worker->request_count);
add_assoc_long_ex(return_value, ZEND_STRL("worker_dispatch_count"), SwooleWG.worker->dispatch_count);
}

if (serv->task_ipc_mode > SW_TASK_IPC_UNIXSOCK && serv->gs->task_workers.queue)
Expand Down

0 comments on commit a353808

Please sign in to comment.