73 changes: 35 additions & 38 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ PHP_ARG_WITH(jemalloc_dir, dir of jemalloc,
PHP_ARG_WITH(libpq_dir, dir of libpq,
[ --with-libpq-dir[=DIR] Include libpq support (requires libpq >= 9.5)], no, no)

PHP_ARG_ENABLE(hugepage, enable hugepage support,
[ --enable-hugepage Experimental: Use hugepage?], no, no)

PHP_ARG_ENABLE(asan, whether to enable asan,
[ --enable-asan Enable asan], no, no)

Expand Down Expand Up @@ -291,16 +288,12 @@ if test "$PHP_SWOOLE" != "no"; then
AC_DEFINE(SW_SOCKETS, 1, [enable sockets support])

dnl Some systems build and package PHP socket extension separately
dnl and php_config.h doesn't have HAVE_SOCKETS defined.
dnl and php_config.h does not have HAVE_SOCKETS defined.
AC_DEFINE(HAVE_SOCKETS, 1, [whether sockets extension is enabled])

PHP_ADD_EXTENSION_DEP(swoole, sockets, true)
fi

if test "$PHP_HUGEPAGE" = "yes"; then
AC_DEFINE(SW_USE_HUGEPAGE, 1, [enable hugepage support])
fi

if test "$PHP_THREAD" = "yes"; then
AC_DEFINE(SW_USE_THREAD, 1, [enable thread support])
fi
Expand All @@ -323,13 +316,13 @@ if test "$PHP_SWOOLE" != "no"; then
CFLAGS="-Wall -pthread $CFLAGS"
LDFLAGS="$LDFLAGS -lpthread"

if test "$SW_OS" = 'MAC'; then
if test "$SW_OS" = "MAC"; then
AC_CHECK_LIB(c, clock_gettime, AC_DEFINE(HAVE_CLOCK_GETTIME, 1, [have clock_gettime]))
else
AC_CHECK_LIB(rt, clock_gettime, AC_DEFINE(HAVE_CLOCK_GETTIME, 1, [have clock_gettime]))
PHP_ADD_LIBRARY(rt, 1, SWOOLE_SHARED_LIBADD)
fi
if test "$SW_OS" = 'LINUX'; then
if test "$SW_OS" = "LINUX"; then
LDFLAGS="$LDFLAGS -z now"
fi

Expand Down Expand Up @@ -426,8 +419,6 @@ if test "$PHP_SWOOLE" != "no"; then
src/coroutine/hook.cc \
src/coroutine/socket.cc \
src/coroutine/ucontext.cc \
src/factory/base.c \
src/factory/process.c \
src/lock/atomic.c \
src/lock/cond.c \
src/lock/file_lock.c \
Expand All @@ -447,17 +438,10 @@ if test "$PHP_SWOOLE" != "no"; then
src/network/client.c \
src/network/connection.c \
src/network/dns.c \
src/network/manager.c \
src/network/port.c \
src/network/process_pool.c \
src/network/reactor_process.c \
src/network/reactor_thread.c \
src/network/server.c \
src/network/stream.c \
src/network/task_worker.c \
src/network/thread_pool.c \
src/network/timer.c \
src/network/worker.c \
src/os/base.c \
src/os/msg_queue.c \
src/os/sendfile.c \
Expand All @@ -484,13 +468,25 @@ if test "$PHP_SWOOLE" != "no"; then
src/reactor/kqueue.c \
src/reactor/poll.c \
src/reactor/select.c \
src/server/base.c \
src/server/manager.c \
src/server/master.cc \
src/server/port.c \
src/server/process.c \
src/server/reactor_process.cc \
src/server/reactor_thread.c \
src/server/task_worker.c \
src/server/worker.c \
src/wrapper/client.cc \
src/wrapper/server.cc \
src/wrapper/timer.cc \
swoole.c \
swoole_async.cc \
swoole_atomic.c \
swoole_buffer.c \
swoole_channel.c \
swoole_channel_coro.cc \
swoole_client.c \
swoole_client.cc \
swoole_client_coro.cc \
swoole_coroutine.cc \
swoole_coroutine_util.cc \
Expand All @@ -508,7 +504,7 @@ if test "$PHP_SWOOLE" != "no"; then
swoole_mysql_coro.cc \
swoole_postgresql_coro.cc \
swoole_process.cc \
swoole_process_pool.c \
swoole_process_pool.cc \
swoole_redis.c \
swoole_redis_coro.cc \
swoole_redis_server.cc \
Expand Down Expand Up @@ -553,56 +549,56 @@ if test "$PHP_SWOOLE" != "no"; then
]
)

if test "$SW_OS" = 'MAC'; then
if test "$SW_CPU" = 'arm'; then
if test "$SW_OS" = "MAC"; then
if test "$SW_CPU" = "arm"; then
SW_CONTEXT_ASM_FILE="arm_aapcs_macho_gas.S"
elif test "$SW_CPU" = 'arm64'; then
elif test "$SW_CPU" = "arm64"; then
SW_CONTEXT_ASM_FILE="arm64_aapcs_macho_gas.S"
else
SW_CONTEXT_ASM_FILE="combined_sysv_macho_gas.S"
fi
elif test "$SW_CPU" = 'x86_64'; then
if test "$SW_OS" = 'LINUX'; then
elif test "$SW_CPU" = "x86_64"; then
if test "$SW_OS" = "LINUX"; then
SW_CONTEXT_ASM_FILE="x86_64_sysv_elf_gas.S"
else
SW_NO_USE_ASM_CONTEXT="yes"
AC_DEFINE([SW_NO_USE_ASM_CONTEXT], 1, [use boost asm context?])
fi
elif test "$SW_CPU" = 'x86'; then
if test "$SW_OS" = 'LINUX'; then
elif test "$SW_CPU" = "x86"; then
if test "$SW_OS" = "LINUX"; then
SW_CONTEXT_ASM_FILE="i386_sysv_elf_gas.S"
else
SW_NO_USE_ASM_CONTEXT="yes"
AC_DEFINE([SW_NO_USE_ASM_CONTEXT], 1, [use boost asm context?])
fi
elif test "$SW_CPU" = 'arm'; then
if test "$SW_OS" = 'LINUX'; then
elif test "$SW_CPU" = "arm"; then
if test "$SW_OS" = "LINUX"; then
SW_CONTEXT_ASM_FILE="arm_aapcs_elf_gas.S"
else
SW_NO_USE_ASM_CONTEXT="yes"
AC_DEFINE([SW_NO_USE_ASM_CONTEXT], 1, [use boost asm context?])
fi
elif test "$SW_CPU" = 'arm64'; then
if test "$SW_OS" = 'LINUX'; then
elif test "$SW_CPU" = "arm64"; then
if test "$SW_OS" = "LINUX"; then
SW_CONTEXT_ASM_FILE="arm64_aapcs_elf_gas.S"
else
SW_NO_USE_ASM_CONTEXT="yes"
AC_DEFINE([SW_NO_USE_ASM_CONTEXT], 1, [use boost asm context?])
fi
elif test "$SW_CPU" = 'mips32'; then
if test "$SW_OS" = 'LINUX'; then
elif test "$SW_CPU" = "mips32"; then
if test "$SW_OS" = "LINUX"; then
SW_CONTEXT_ASM_FILE="mips32_o32_elf_gas.S"
else
SW_NO_USE_ASM_CONTEXT="yes"
AC_DEFINE([SW_NO_USE_ASM_CONTEXT], 1, [use boost asm context?])
fi
fi

if test "$SW_NO_USE_ASM_CONTEXT" = 'no'; then
if test "$SW_NO_USE_ASM_CONTEXT" = "no"; then
swoole_source_file="$swoole_source_file \
${SW_ASM_DIR}make_${SW_CONTEXT_ASM_FILE} \
${SW_ASM_DIR}jump_${SW_CONTEXT_ASM_FILE} "
elif test "$SW_HAVE_BOOST_CONTEXT" = 'yes'; then
elif test "$SW_HAVE_BOOST_CONTEXT" = "yes"; then
LDFLAGS="$LDFLAGS -lboost_context"
fi

Expand All @@ -626,7 +622,7 @@ if test "$PHP_SWOOLE" != "no"; then
PHP_INSTALL_HEADERS([ext/swoole], [*.h config.h include/*.h])

PHP_REQUIRE_CXX()
PHP_ADD_LIBRARY(stdc++, 1, SWOOLE_SHARED_LIBADD)

CXXFLAGS="$CXXFLAGS -Wall -Wno-unused-function -Wno-deprecated -Wno-deprecated-declarations -std=c++11"

if test "$PHP_PICOHTTPPARSER" = "yes"; then
Expand All @@ -636,14 +632,15 @@ if test "$PHP_SWOOLE" != "no"; then

PHP_ADD_BUILD_DIR($ext_builddir/src/core)
PHP_ADD_BUILD_DIR($ext_builddir/src/memory)
PHP_ADD_BUILD_DIR($ext_builddir/src/factory)
PHP_ADD_BUILD_DIR($ext_builddir/src/reactor)
PHP_ADD_BUILD_DIR($ext_builddir/src/pipe)
PHP_ADD_BUILD_DIR($ext_builddir/src/lock)
PHP_ADD_BUILD_DIR($ext_builddir/src/os)
PHP_ADD_BUILD_DIR($ext_builddir/src/network)
PHP_ADD_BUILD_DIR($ext_builddir/src/server)
PHP_ADD_BUILD_DIR($ext_builddir/src/protocol)
PHP_ADD_BUILD_DIR($ext_builddir/src/coroutine)
PHP_ADD_BUILD_DIR($ext_builddir/src/wrapper)
PHP_ADD_BUILD_DIR($ext_builddir/thirdparty/hiredis)
PHP_ADD_BUILD_DIR($ext_builddir/thirdparty/boost)
PHP_ADD_BUILD_DIR($ext_builddir/thirdparty/boost/asm)
Expand Down
25 changes: 14 additions & 11 deletions config.w32
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ if (PHP_SWOOLE != "no") {
+ 'src/coroutine/hook.cc '
+ 'src/coroutine/socket.cc '
+ 'src/coroutine/ucontext.cc '
+ 'src/factory/base.c '
+ 'src/factory/process.c '
+ 'src/lock/atomic.c '
+ 'src/lock/cond.c '
+ 'src/lock/file_lock.c '
Expand All @@ -48,17 +46,10 @@ if (PHP_SWOOLE != "no") {
+ 'src/network/client.c '
+ 'src/network/connection.c '
+ 'src/network/dns.c '
+ 'src/network/manager.c '
+ 'src/network/port.c '
+ 'src/network/process_pool.c '
+ 'src/network/reactor_process.c '
+ 'src/network/reactor_thread.c '
+ 'src/network/server.c '
+ 'src/network/stream.c '
+ 'src/network/task_worker.c '
+ 'src/network/thread_pool.c '
+ 'src/network/timer.c '
+ 'src/network/worker.c '
+ 'src/os/base.c '
+ 'src/os/msg_queue.c '
+ 'src/os/sendfile.c '
Expand All @@ -85,13 +76,25 @@ if (PHP_SWOOLE != "no") {
+ 'src/reactor/kqueue.c '
+ 'src/reactor/poll.c '
+ 'src/reactor/select.c '
+ 'src/server/base.c '
+ 'src/server/manager.c '
+ 'src/server/master.cc '
+ 'src/server/port.c '
+ 'src/server/process.c '
+ 'src/server/reactor_process.cc '
+ 'src/server/reactor_thread.c '
+ 'src/server/task_worker.c '
+ 'src/server/worker.c '
+ 'src/wrapper/client.cc '
+ 'src/wrapper/server.cc '
+ 'src/wrapper/timer.cc '
+ 'swoole.c '
+ 'swoole_async.cc '
+ 'swoole_atomic.c '
+ 'swoole_buffer.c '
+ 'swoole_channel.c '
+ 'swoole_channel_coro.cc '
+ 'swoole_client.c '
+ 'swoole_client.cc '
+ 'swoole_client_coro.cc '
+ 'swoole_coroutine.cc '
+ 'swoole_coroutine_util.cc '
Expand All @@ -109,7 +112,7 @@ if (PHP_SWOOLE != "no") {
+ 'swoole_mysql_coro.cc '
+ 'swoole_postgresql_coro.cc '
+ 'swoole_process.cc '
+ 'swoole_process_pool.c '
+ 'swoole_process_pool.cc '
+ 'swoole_redis.c '
+ 'swoole_redis_coro.cc '
+ 'swoole_redis_server.cc '
Expand Down
75 changes: 63 additions & 12 deletions core-tests/include/tests.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,83 @@
#include "server.h"
#include "coroutine.h"
#include "socket.h"

#include <gtest/gtest.h>
#include <initializer_list>
#include <utility>

inline void coro_test_wait()
class coro_test
{
SwooleG.main_reactor->wait(SwooleG.main_reactor, nullptr);
public:
coro_test(coroutine_func_t _fn, void *_arg, int *_complete_num) : fn(_fn), arg(_arg), complete_num(_complete_num)
{

}

void run()
{
fn(arg);
(*complete_num)++;
}

private:
coroutine_func_t fn;
void *arg;
int *complete_num;
};

static void coro_test_fn(void *arg)
{
((coro_test*) arg)->run();
delete (coro_test*) arg;
}

inline void coro_test_create(coroutine_func_t fn, void *arg = nullptr)
static inline void coro_test_wait(int *complete_num, int total_num)
{
long cid = swoole::Coroutine::create(fn, arg);
SwooleG.main_reactor->once = true;

while (*complete_num != total_num)
{
SwooleG.main_reactor->wait(SwooleG.main_reactor, nullptr);
}

SwooleG.main_reactor->once = false;
}

static inline void coro_test_create(coroutine_func_t fn, void *arg, int *complete_num)
{
auto test = new coro_test(fn, arg, complete_num);
long cid = swoole::Coroutine::create(coro_test_fn, test);
ASSERT_GT(cid, 0);
}

inline void coro_test(coroutine_func_t fn, void *arg = nullptr)
static inline void coro_test(std::initializer_list<std::pair<coroutine_func_t, void*>> args)
{
coro_test_create(fn, arg);
coro_test_wait();
int complete_num = 0;

for (const auto &arg : args)
{
coro_test_create(arg.first, arg.second, &complete_num);
}

coro_test_wait(&complete_num, args.size());
}

inline void coro_test(coroutine_func_t *fns, size_t num, void **args = nullptr)
static inline void coro_test(std::initializer_list<coroutine_func_t> args)
{
size_t i;
for (i = 0; i < num; ++i)
int complete_num = 0;

for (const auto &arg : args)
{
coro_test_create(fns[i], args ? args[i] : nullptr);
coro_test_create(arg, nullptr, &complete_num);
}

coro_test_wait();
coro_test_wait(&complete_num, args.size());
}

static inline void coro_test(coroutine_func_t fn, void *arg = nullptr)
{
int complete_num = 0;
coro_test_create(fn, arg, &complete_num);
coro_test_wait(&complete_num, 1);
}
2 changes: 2 additions & 0 deletions core-tests/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
cmake -DUSE_CARES=ON . && make -j8 && ./bin/core_tests
40 changes: 19 additions & 21 deletions core-tests/src/cares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,24 @@ TEST(cares, resolve_inet6)

TEST(cares, concurrency)
{
coroutine_func_t fns[3];

fns[0] = [](void *arg)
{
auto result = CAres::resolve("www.swoole.com", AF_INET, 5);
ASSERT_EQ(result, "47.244.108.17");
};

fns[1] = [](void *arg)
{
auto result = CAres::resolve("www.swoole.com", AF_INET, 0.001);
ASSERT_EQ(result, "");
};

fns[2] = [](void *arg)
{
auto result = CAres::resolve("ipv6.sjtu.edu.cn", AF_INET6, 5);
ASSERT_EQ(result, "2001:da8:8000:1::80");
};

coro_test(fns, 3);
coro_test({
[](void *arg)
{
auto result = CAres::resolve("www.swoole.com", AF_INET, 5);
ASSERT_EQ(result, "47.244.108.17");
},

[](void *arg)
{
auto result = CAres::resolve("www.swoole.com", AF_INET, 0.001);
ASSERT_EQ(result, "");
},

[](void *arg)
{
auto result = CAres::resolve("ipv6.sjtu.edu.cn", AF_INET6, 5);
ASSERT_EQ(result, "2001:da8:8000:1::80");
}
});
}
#endif
4 changes: 2 additions & 2 deletions core-tests/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
TEST(client, tcp)
{
int ret;
swClient cli, cli2;
swClient cli;
char buf[128];

ret = swClient_create(&cli, SW_SOCK_TCP, SW_SOCK_SYNC);
ASSERT_EQ(ret, 0);
ret = cli.connect(&cli, (char *) "127.0.0.1", 9501, 0.5, 0);
ret = cli.connect(&cli, (char *) "127.0.0.1", 9501, -1, 0);
ASSERT_EQ(ret, 0);
ret = cli.send(&cli, (char *) SW_STRS("echo"), 0);
ASSERT_GT(ret, 0);
Expand Down
185 changes: 0 additions & 185 deletions core-tests/src/coro.cpp

This file was deleted.

44 changes: 44 additions & 0 deletions core-tests/src/coroutine/base.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include "tests.h"

using namespace swoole;

TEST(coroutine_base, create)
{
long _cid;
long cid = Coroutine::create([](void *arg)
{
*(long *) arg = Coroutine::get_current_cid();
}, &_cid);

ASSERT_GT(cid, 0);
ASSERT_EQ(cid, _cid);
}

TEST(coroutine_base, get_current)
{
long _cid;
long cid = Coroutine::create([](void *arg)
{
auto co = Coroutine::get_current();
*(long *) arg = co->get_cid();
}, &_cid);

ASSERT_GT(cid, 0);
ASSERT_EQ(cid, _cid);
}

TEST(coroutine_base, yield_resume)
{
long _cid;
long cid = Coroutine::create([](void *arg)
{
long cid = Coroutine::get_current_cid();
Coroutine *co = Coroutine::get_by_cid(cid);
co->yield();
*(long *) arg = Coroutine::get_current_cid();
}, &_cid);

ASSERT_GT(cid, 0);
Coroutine::get_by_cid(cid)->resume();
ASSERT_EQ(cid, _cid);
}
98 changes: 98 additions & 0 deletions core-tests/src/coroutine/channel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#include "tests.h"
#include "channel.h"

using namespace swoole;
using namespace std;

TEST(coroutine_channel, push_pop)
{
coro_test([](void *arg)
{
Channel chan(1);
int i = 1;
bool ret;

ret = chan.push(&i);
ASSERT_TRUE(ret);
ASSERT_EQ(*(int *) chan.pop(), i);
});
}

TEST(coroutine_channel, push_yield)
{
Channel chan(1);

coro_test({
make_pair([](void *arg)
{
auto chan = (Channel *) arg;
int i = 1;
bool ret;

ret = chan->push(&i);
ASSERT_TRUE(ret);
ret = chan->push(&i);
ASSERT_TRUE(ret);
}, &chan),

make_pair([](void *arg)
{
auto chan = (Channel *) arg;
ASSERT_EQ(*(int *) chan->pop(), 1);
ASSERT_EQ(*(int *) chan->pop(), 1);
}, &chan)
});
}

TEST(coroutine_channel, pop_yield)
{
Channel chan(1);

coro_test({
make_pair([](void *arg)
{
auto chan = (Channel *) arg;

ASSERT_EQ(*(int *) chan->pop(), 1);
ASSERT_EQ(*(int *) chan->pop(), 1);
}, &chan),

make_pair([](void *arg)
{
auto chan = (Channel *) arg;
int i = 1;
bool ret;

ret = chan->push(&i);
ASSERT_TRUE(ret);
ret = chan->push(&i);
ASSERT_TRUE(ret);
}, &chan)
});
}

TEST(coroutine_channel, push_timeout)
{
coro_test([](void *arg)
{
Channel chan(1);
bool ret;

ret = chan.push(nullptr, 0.001);
ASSERT_TRUE(ret);
ret = chan.push(nullptr, 0.001);
ASSERT_FALSE(ret);
});
}

TEST(coroutine_channel, pop_timeout)
{
coro_test([](void *arg)
{
Channel chan(1);
void *ret;

ret = chan.pop(0.001);
ASSERT_EQ(ret, nullptr);
});
}
74 changes: 74 additions & 0 deletions core-tests/src/coroutine/gethostbyname.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#include "tests.h"

using namespace swoole;

TEST(coroutine_gethostbyname, resolve_with_cache)
{
coro_test([](void *arg)
{
set_dns_cache_capacity(10);

std::string addr1 = Coroutine::gethostbyname("www.baidu.com", AF_INET);
ASSERT_NE(addr1, "");

int64_t start = swTimer_get_absolute_msec();

for (int i = 0; i < 100; ++i)
{
std::string addr2 = Coroutine::gethostbyname("www.baidu.com", AF_INET);
ASSERT_EQ(addr1, addr2);
}

ASSERT_LT(swTimer_get_absolute_msec() - start, 5);
});
}

TEST(coroutine_gethostbyname, resolve_without_cache)
{
coro_test([](void *arg)
{
set_dns_cache_capacity(0);

std::string addr1 = Coroutine::gethostbyname("www.baidu.com", AF_INET);
ASSERT_NE(addr1, "");

int64_t start = swTimer_get_absolute_msec();

for (int i = 0; i < 5; ++i)
{
std::string addr2 = Coroutine::gethostbyname("www.baidu.com", AF_INET);
ASSERT_NE(addr2, "");
}

ASSERT_GT(swTimer_get_absolute_msec() - start, 5);
});
}

TEST(coroutine_gethostbyname, resolve_cache_inet4_and_inet6)
{
coro_test([](void *arg)
{
set_dns_cache_capacity(10);

std::string addr1 = Coroutine::gethostbyname("ipv6.sjtu.edu.cn", AF_INET);
std::string addr2 = Coroutine::gethostbyname("ipv6.sjtu.edu.cn", AF_INET6);

ASSERT_NE(addr1, "");
ASSERT_NE(addr2, "");
ASSERT_EQ(addr1.find(":"), addr1.npos);
ASSERT_NE(addr2.find(":"), addr2.npos);

int64_t start = swTimer_get_absolute_msec();

for (int i = 0; i < 100; ++i)
{
std::string addr3 = Coroutine::gethostbyname("ipv6.sjtu.edu.cn", AF_INET);
std::string addr4 = Coroutine::gethostbyname("ipv6.sjtu.edu.cn", AF_INET6);

ASSERT_EQ(addr1, addr3);
ASSERT_EQ(addr2, addr4);
}

ASSERT_LT(swTimer_get_absolute_msec() - start, 5);
});
}
114 changes: 114 additions & 0 deletions core-tests/src/coroutine/socket.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#include "tests.h"

using namespace swoole;

TEST(coroutine_socket, connect_refused)
{
coro_test([](void *arg)
{
Socket sock(SW_SOCK_TCP);
bool retval = sock.connect("127.0.0.1", 9801, 0.5);
ASSERT_EQ(retval, false);
ASSERT_EQ(sock.errCode, ECONNREFUSED);
});
}

TEST(coroutine_socket, connect_timeout)
{
coro_test([](void *arg)
{
Socket sock(SW_SOCK_TCP);
sock.set_timeout(0.5);
bool retval = sock.connect("192.0.0.1", 9801);
ASSERT_EQ(retval, false);
ASSERT_EQ(sock.errCode, ETIMEDOUT);
});
}

TEST(coroutine_socket, connect_with_dns)
{
coro_test([](void *arg)
{
Socket sock(SW_SOCK_TCP);
bool retval = sock.connect("www.baidu.com", 80, 0.5);
ASSERT_EQ(retval, true);
ASSERT_EQ(sock.errCode, 0);
});
}

TEST(coroutine_socket, recv_success)
{
coro_test([](void *arg)
{
Socket sock(SW_SOCK_TCP);
bool retval = sock.connect("127.0.0.1", 9501, -1);
ASSERT_EQ(retval, true);
ASSERT_EQ(sock.errCode, 0);
sock.send("echo", 5);
char buf[128];
int n = sock.recv(buf, sizeof(buf));
ASSERT_EQ(strcmp(buf, "hello world\n"), 0);
});
}

TEST(coroutine_socket, recv_fail)
{
coro_test([](void *arg)
{
Socket sock(SW_SOCK_TCP);
bool retval = sock.connect("127.0.0.1", 9501, -1);
ASSERT_EQ(retval, true);
ASSERT_EQ(sock.errCode, 0);
sock.send("close", 6);
char buf[128];
int n = sock.recv(buf, sizeof(buf));
ASSERT_EQ(n, 0);
});
}

TEST(coroutine_socket, bind_success)
{
Socket sock(SW_SOCK_TCP);
bool retval = sock.bind("127.0.0.1", 9909);
ASSERT_EQ(retval, true);
}

TEST(coroutine_socket, bind_fail)
{
Socket sock(SW_SOCK_TCP);
bool retval = sock.bind("192.111.11.1", 9909);
ASSERT_EQ(retval, false);
ASSERT_EQ(sock.errCode, EADDRNOTAVAIL);
}

TEST(coroutine_socket, listen)
{
Socket sock(SW_SOCK_TCP);
bool retval = sock.bind("127.0.0.1", 9909);
ASSERT_EQ(retval, true);
ASSERT_EQ(sock.listen(128), true);
}

TEST(coroutine_socket, accept)
{
coro_test({
[](void *arg)
{
Socket sock(SW_SOCK_TCP);
bool retval = sock.bind("127.0.0.1", 9909);
ASSERT_EQ(retval, true);
ASSERT_EQ(sock.listen(128), true);

Socket *conn = sock.accept();
ASSERT_NE(conn, nullptr);
},

[](void *arg)
{
Socket sock(SW_SOCK_TCP);
bool retval = sock.connect("127.0.0.1", 9909, -1);
ASSERT_EQ(retval, true);
ASSERT_EQ(sock.errCode, 0);
}
});
}
40 changes: 0 additions & 40 deletions core-tests/src/hashmap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,43 +86,3 @@ TEST(hashmap, integer)
free(lists[i]);
}
}

TEST(stl_hashmap, integer_32)
{
std::unordered_map<int, int> m;
for (int i = 0; i < 1000000; i++)
{
m[i] = i;
}
}

TEST(stl_hashmap, integer_64)
{
std::unordered_map<long, long> m;
double s = swoole_microtime();
for (int i = 0; i < 1000000; i++)
{
m[i] = i;
}
printf("cost %f s\n", swoole_microtime() - s);
}

TEST(stl_map, integer_32)
{
std::map<int, int> m;
double s = swoole_microtime();
for (int i = 0; i < 1000000; i++)
{
m[i] = i;
}
printf("cost %f s\n", swoole_microtime() - s);
}

TEST(stl_map, integer_64)
{
std::map<long, long> m;
for (int i = 0; i < 1000000; i++)
{
m[i] = i;
}
}
1 change: 1 addition & 0 deletions core-tests/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ static pid_t create_server()
{
pid_t pid;
swoole_shell_exec("php server/tcp.php", &pid, 1);
sleep(1); // wait 1s
return pid;
}

Expand Down
39 changes: 39 additions & 0 deletions core-tests/src/network/aio_thread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include "tests.h"
#include "async.h"

#include <atomic>

using namespace std;

TEST(network_aio_thread, dispatch)
{
atomic<int> i(0);
swAio_event event;
event.object = &i;
event.canceled = 0;

event.handler = [](swAio_event *event)
{
(*(atomic<int> *) event->object)++;
};

for (int i = 0; i < 1000; ++i)
{
auto ret = swAio_dispatch2(&event);
ASSERT_EQ(ret->object, event.object);
ASSERT_NE(ret->task_id, event.task_id);
}

time_t start = time(nullptr);
while (i != 1000)
{
usleep(100);

if ((time(nullptr) - start) > 3)
{
ASSERT_TRUE(false);
}
}

swAio_free();
}
110 changes: 110 additions & 0 deletions core-tests/src/os/wait.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "tests.h"
#include "coroutine_c_api.h"

using namespace swoole;

TEST(os_wait, waitpid_before_child_exit)
{
coro_test([](void *arg)
{
swoole_coroutine_signal_init();

pid_t pid = fork();
ASSERT_NE(pid, -1);

if (pid == 0)
{
usleep(100000);
exit(0);
}

int status = -1;
pid_t pid2 = swoole_coroutine_waitpid(pid, &status, 0);
ASSERT_EQ(status, 0);
ASSERT_EQ(pid, pid2);
});
}

TEST(os_wait, waitpid_after_child_exit)
{
coro_test([](void *arg)
{
swoole_coroutine_signal_init();

pid_t pid = fork();
ASSERT_NE(pid, -1);

if (pid == 0)
{
exit(0);
}

usleep(100000);
int status = -1;
pid_t pid2 = swoole_coroutine_waitpid(pid, &status, 0);
ASSERT_EQ(status, 0);
ASSERT_EQ(pid, pid2);
});
}

TEST(os_wait, wait_before_child_exit)
{
coro_test([](void *arg)
{
swoole_coroutine_signal_init();

pid_t pid = fork();
ASSERT_NE(pid, -1);

if (pid == 0)
{
usleep(100000);
exit(0);
}

int status = -1;
pid_t pid2 = -1;

for (;;)
{
pid2 = swoole_coroutine_wait(&status);
if (pid2 == pid)
{
break;
}
}

ASSERT_EQ(WEXITSTATUS(status), 0);
});
}

TEST(os_wait, wait_after_child_exit)
{
coro_test([](void *arg)
{
swoole_coroutine_signal_init();

pid_t pid = fork();
ASSERT_NE(pid, -1);

if (pid == 0)
{
exit(0);
}

usleep(100000);
int status = -1;
pid_t pid2 = -1;

for (;;)
{
pid2 = swoole_coroutine_wait(&status);
if (pid2 == pid)
{
break;
}
}

ASSERT_EQ(WEXITSTATUS(status), 0);
});
}
52 changes: 8 additions & 44 deletions core-tests/src/ringbuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
#include <thread>

#define READ_THREAD_N 4
#define WRITE_N 100000
#define PRINT_SERNUM_N 1000
#define WRITE_N 10000

static swMemoryPool *pool = NULL;

Expand Down Expand Up @@ -61,7 +60,6 @@ static void thread_write(void)
for (i = 0; i < WRITE_N; i++)
{
size = 10000 + rand() % 90000;
//printf("alloc size=%d\n", size);

yield_count = 0;
do
Expand All @@ -79,11 +77,7 @@ static void thread_write(void)
}
} while (yield_count < 100);

if (!ptr)
{
printf("alloc failed. break\n");
break;
}
ASSERT_NE(ptr, nullptr);

send_pkg.ptr = ptr;
send_pkg.size = size;
Expand All @@ -94,23 +88,13 @@ static void thread_write(void)
//在指针末尾保存一个串号
memcpy((char*) ptr + size - 4, &(send_pkg.serial_num), sizeof(send_pkg.serial_num));

#ifdef PRINT_SERNUM_OPEN
if (i % PRINT_SERNUM_N == 0)
{
printf("send. send_count=%d, serial_num=%d\n", i, send_pkg.serial_num);
}
#endif
if (threads[i % READ_THREAD_N].pipe.write(&threads[i % READ_THREAD_N].pipe, &send_pkg, sizeof(send_pkg)) < 0)
{
printf("write() failed. Error: %s\n", strerror(errno));
}
ASSERT_FALSE(threads[i % READ_THREAD_N].pipe.write(&threads[i % READ_THREAD_N].pipe, &send_pkg, sizeof(send_pkg)) < 0);

if (i % 100 == 0)
{
usleep(10);
}
//sleep(1);
}
printf("alloc count = %d, yield_total_count = %d\n", i, yield_total_count);
}

static void thread_read(int i)
Expand All @@ -126,35 +110,15 @@ static void thread_read(int i)
for (j = 0; j < task_n; j++)
{
ret = sock->read(sock, &recv_pkg, sizeof(recv_pkg));
if (ret < 0)
{
printf("read() failed. Error: %s\n", strerror(errno));
break;
}
ASSERT_FALSE(ret < 0);

memcpy(&tmp, recv_pkg.ptr, sizeof(tmp));
if (tmp != recv_pkg.size)
{
printf("Thread#%d: size error, recv_count=%d, length1=%d, length2=%d\n", i, recv_count, recv_pkg.size, tmp);
continue;
}
ASSERT_EQ(tmp, recv_pkg.size);

memcpy(&tmp, (char*) recv_pkg.ptr + recv_pkg.size - 4, sizeof(tmp));
if (tmp != recv_pkg.serial_num)
{
printf("Thread#%d: serial_num error, recv_count=%d, num1=%d, num2=%d\n", i, recv_count, recv_pkg.serial_num,
tmp);
continue;
}
ASSERT_EQ(tmp, recv_pkg.serial_num);

#ifdef PRINT_SERNUM_OPEN
if (j % PRINT_SERNUM_N == 0)
{
printf("recv. recv_count=%d, serial_num=%d\n", recv_count, tmp);
}
#endif
//printf("Thread#%d: ptr=%p,size=%d\n", i, recv_pkg.ptr, recv_pkg.size);
pool->free(pool, recv_pkg.ptr);
recv_count++;
}
printf("worker #%d finish, recv_count=%d\n", i, recv_count);
}
219 changes: 0 additions & 219 deletions core-tests/src/serv_test.cpp

This file was deleted.

91 changes: 40 additions & 51 deletions core-tests/src/server.cpp
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
#include "swoole.h"
#include "server.h"


namespace swoole_test
{
static int my_onPacket(swServer *serv, swEventData *req);

static int my_onReceive(swServer *serv, swEventData *req);

static void my_onStart(swServer *serv);

static void my_onShutdown(swServer *serv);

static void my_onConnect(swServer *serv, swDataHead *info);

static void my_onClose(swServer *serv, swDataHead *info);

static void my_onWorkerStart(swServer *serv, int worker_id);

static void my_onWorkerStop(swServer *serv, int worker_id);

static int g_receive_count = 0;

int server_main(int argc, char **argv)
int server_test()
{
int ret;
swServer serv;
Expand All @@ -42,10 +36,19 @@ int server_main(int argc, char **argv)

#ifdef HAVE_OPENSSL
//serv.ssl_cert_file = "tests/ssl/ssl.crt";
//serv.ssl_key_file = "tests/ssl/ssl.key";
//serv.open_ssl = 1;
//serv.ssl_key_file = "tests/ssl/ssl.key";
//serv.open_ssl = 1;
#endif

serv.onStart = my_onStart;
serv.onShutdown = my_onShutdown;
serv.onConnect = my_onConnect;
serv.onReceive = my_onReceive;
serv.onPacket = my_onPacket;
serv.onClose = my_onClose;
serv.onWorkerStart = my_onWorkerStart;
serv.onWorkerStop = my_onWorkerStop;

// swSignal_add(SIGINT, user_signal);

//create Server
Expand All @@ -56,24 +59,15 @@ int server_main(int argc, char **argv)
exit(0);
}

serv.onStart = my_onStart;
serv.onShutdown = my_onShutdown;
serv.onConnect = my_onConnect;
serv.onReceive = my_onReceive;
serv.onPacket = my_onPacket;
serv.onClose = my_onClose;
serv.onWorkerStart = my_onWorkerStart;
serv.onWorkerStop = my_onWorkerStop;

swListenPort *port = swServer_add_port(&serv, SW_SOCK_TCP, (char *) "127.0.0.1", 9501);
swListenPort *port = swServer_add_port(&serv, SW_SOCK_TCP, "127.0.0.1", 9501);
port->open_eof_check = 0;
//config
port->backlog = 128;
memcpy(port->protocol.package_eof, SW_STRL("\r\n\r\n")); //开启eof检测,启用buffer区

swServer_add_port(&serv, SW_SOCK_UDP, (char *) "0.0.0.0", 9502);
swServer_add_port(&serv, SW_SOCK_TCP6, (char *) "::", 9503);
swServer_add_port(&serv, SW_SOCK_UDP6, (char *) "::", 9504);
swServer_add_port(&serv, SW_SOCK_UDP, "0.0.0.0", 9502);
swServer_add_port(&serv, SW_SOCK_TCP6, "::", 9503);
swServer_add_port(&serv, SW_SOCK_UDP6, "::", 9504);

ret = swServer_start(&serv);
if (ret < 0)
Expand All @@ -97,16 +91,16 @@ void my_onWorkerStop(swServer *serv, int worker_id)
int my_onReceive(swServer *serv, swEventData *req)
{
int ret;
char resp_data[SW_BUFFER_SIZE];
char resp_data[SW_IPC_BUFFER_SIZE];

g_receive_count++;

swConnection *conn = swWorker_get_connection(serv, req->info.fd);
swoole_rtrim(req->data, req->info.len);
printf("onReceive[%d]: ip=%s|port=%d Data=%s|Len=%d\n", g_receive_count, swConnection_get_ip(conn),
swConnection_get_port(conn), req->data, req->info.len);
swConnection_get_port(conn), req->data, req->info.len);

int n = snprintf(resp_data, SW_BUFFER_SIZE, "Server: %*s\n", req->info.len, req->data);
int n = snprintf(resp_data, SW_IPC_BUFFER_SIZE, "Server: %*s\n", req->info.len, req->data);
ret = serv->send(serv, req->info.fd, resp_data, n);
if (ret < 0)
{
Expand All @@ -121,13 +115,11 @@ int my_onReceive(swServer *serv, swEventData *req)

int my_onPacket(swServer *serv, swEventData *req)
{
swDgramPacket *packet;

swString *buffer = swWorker_get_buffer(serv, req->info.from_id);
packet = (swDgramPacket *) buffer->str;

int serv_sock = req->info.from_fd;
char *data;
swWorker_get_data(req, &data);
swDgramPacket *packet = (swDgramPacket *) data;

int length;
char address[256];
int port = 0;
Expand All @@ -136,51 +128,46 @@ int my_onPacket(swServer *serv, swEventData *req)
//udp ipv4
if (req->info.type == SW_EVENT_UDP)
{
struct in_addr sin_addr;
sin_addr.s_addr = packet->addr.v4.s_addr;
char *tmp = inet_ntoa(sin_addr);
memcpy(address, tmp, strlen(tmp));
inet_ntop(AF_INET6, &packet->info.addr.inet_v4.sin_addr, address, sizeof(address));
data = packet->data;
length = packet->length;
port = packet->port;
port = ntohs(packet->info.addr.inet_v4.sin_port);
}
//udp ipv6
//udp ipv6
else if (req->info.type == SW_EVENT_UDP6)
{
inet_ntop(AF_INET6, &packet->addr.v6, address, sizeof(address));
inet_ntop(AF_INET6, &packet->info.addr.inet_v6.sin6_addr, address, sizeof(address));
data = packet->data;
length = packet->length;
port = packet->port;
port = ntohs(packet->info.addr.inet_v6.sin6_port);
}
//unix dgram
//unix dgram
else if (req->info.type == SW_EVENT_UNIX_DGRAM)
{
memcpy(address, packet->data, packet->addr.un.path_length);
data = packet->data + packet->addr.un.path_length;
length = packet->length - packet->addr.un.path_length;
strcpy(address, packet->info.addr.un.sun_path);
data = packet->data;
length = packet->length;
}

printf("Packet[client=%s:%d, %d bytes]: data=%*s\n", address, port, length, length, data);

char resp_data[SW_BUFFER_SIZE];
int n = snprintf(resp_data, SW_BUFFER_SIZE, "Server: %*s", length, data);
char resp_data[SW_IPC_BUFFER_SIZE];
int n = snprintf(resp_data, SW_IPC_BUFFER_SIZE, "Server: %*s", length, data);

//udp ipv4
if (req->info.type == SW_EVENT_UDP)
{
ret = swSocket_udp_sendto(serv_sock, address, port, resp_data, n);
}
//udp ipv6
//udp ipv6
else if (req->info.type == SW_EVENT_UDP6)
{
ret = swSocket_udp_sendto6(serv_sock, address, port, resp_data, n);
}
//unix dgram
//unix dgram
else if (req->info.type == SW_EVENT_UNIX_DGRAM)
{
memcpy(address, packet->data, packet->addr.un.path_length);
data = packet->data + packet->addr.un.path_length;
length = packet->length - packet->addr.un.path_length;
ret = swSocket_unix_sendto(serv_sock, address, resp_data, n);
}

if (ret < 0)
Expand Down Expand Up @@ -214,3 +201,5 @@ void my_onClose(swServer *serv, swDataHead *info)
{
printf("PID=%d\tClose fd=%d|from_id=%d\n", getpid(), info->fd, info->from_id);
}

}
7 changes: 1 addition & 6 deletions core-tests/src/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

static swThreadPool pool;
static int _pipe;
const static int N = 1000000;
const static int N = 10000;

static int thread_onTask(swThreadPool *pool, void *task, int task_len)
{
Expand All @@ -25,14 +25,9 @@ TEST(thread_pool, dispatch)

_pipe = eventfd(0, 0);


for (long i = 0; i < N; i++)
{
ASSERT_EQ(swThreadPool_dispatch(&pool, (void*) &result, sizeof(result)), SW_OK);
if (N % 10000 == 9999)
{
usleep(1);
}
}

long buf;
Expand Down
2 changes: 1 addition & 1 deletion examples/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ static int receive_count = 0;
int my_onReceive(swServer *serv, swEventData *req)
{
int ret;
char resp_data[SW_BUFFER_SIZE];
char resp_data[SW_IPC_BUFFER_SIZE];
swSendData resp;
receive_count ++;
resp.info.fd = req->info.fd; //fd can be not source fd.
Expand Down
1 change: 1 addition & 0 deletions examples/server/dispatch_stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
{
var_dump($data);
echo "#{$serv->worker_id}>> received length=" . strlen($data) . "\n";
$serv->send($fd, "Swoole $data\n");
});

$serv->start();
18 changes: 18 additions & 0 deletions examples/server/reload_force.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php
$flag = 0;
$serv = new swoole_server('127.0.0.1', 9501, SWOOLE_BASE);
$serv->set([
"worker_num" => 4,
"max_wait_time" => 1
]);
$serv->on("WorkerStart", function (\swoole_server $server, $worker_id) {
global $flag;
echo "$worker_id [".$server->worker_pid."] start \n";
});
$serv->on('receive', function ($serv, $fd, $tid, $data) {
echo "$tid recv $data\n";
if ($data) {
sleep(100);
}
});
$serv->start();
18 changes: 18 additions & 0 deletions examples/server/reload_force2.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php
$flag = 0;
$serv = new swoole_server('127.0.0.1', 9501);
$serv->set([
"worker_num" => 4,
"max_wait_time" => 1
]);
$serv->on("WorkerStart", function (\swoole_server $server, $worker_id) {
global $flag;
echo "$worker_id [".$server->worker_pid."] start \n";
});
$serv->on('receive', function ($serv, $fd, $tid, $data) {
echo "$tid recv $data\n";
if ($data) {
sleep(100);
}
});
$serv->start();
51 changes: 24 additions & 27 deletions examples/test_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void my_onWorkerStop(swServer *serv, int worker_id)
int my_onReceive(swServer *serv, swEventData *req)
{
int ret;
char resp_data[SW_BUFFER_SIZE];
char resp_data[SW_IPC_BUFFER_SIZE];

g_receive_count++;

Expand All @@ -102,7 +102,7 @@ int my_onReceive(swServer *serv, swEventData *req)
printf("onReceive[%d]: ip=%s|port=%d Data=%s|Len=%d\n", g_receive_count, swConnection_get_ip(conn),
swConnection_get_port(conn), req->data, req->info.len);

int n = snprintf(resp_data, SW_BUFFER_SIZE, "Server: %*s\n", req->info.len, req->data);
int n = sw_snprintf(resp_data, SW_IPC_BUFFER_SIZE, "Server: %*s\n", req->info.len, req->data);
ret = serv->send(serv, req->info.fd, resp_data, n);
if (ret < 0)
{
Expand All @@ -117,49 +117,48 @@ int my_onReceive(swServer *serv, swEventData *req)

int my_onPacket(swServer *serv, swEventData *req)
{
swDgramPacket *packet;

swString *buffer = swWorker_get_buffer(serv, req->info.from_id);
packet = (swDgramPacket*) buffer->str;

int serv_sock = req->info.from_fd;
char *data;
int length;
char address[256];
int port = 0;
int ret;

swDgramPacket *packet;

swWorker_get_data(req, &data);
packet = (swDgramPacket*) data;

int serv_sock = req->info.from_fd;

//udp ipv4
if (req->info.type == SW_EVENT_UDP)
{
struct in_addr sin_addr;
sin_addr.s_addr = packet->addr.v4.s_addr;
char *tmp = inet_ntoa(sin_addr);
memcpy(address, tmp, strlen(tmp));
data = packet->data;
length = packet->length;
port = packet->port;
inet_ntop(AF_INET6, &packet->info.addr.inet_v4.sin_addr, address, sizeof(address));
port = ntohs(packet->info.addr.inet_v4.sin_port);
}
//udp ipv6
else if (req->info.type == SW_EVENT_UDP6)
{
inet_ntop(AF_INET6, &packet->addr.v6, address, sizeof(address));
data = packet->data;
length = packet->length;
port = packet->port;
inet_ntop(AF_INET6, &packet->info.addr.inet_v6.sin6_addr, address, sizeof(address));
port = ntohs(packet->info.addr.inet_v6.sin6_port);
}
//unix dgram
else if (req->info.type == SW_EVENT_UNIX_DGRAM)
{
memcpy(address, packet->data, packet->addr.un.path_length);
data = packet->data + packet->addr.un.path_length;
length = packet->length - packet->addr.un.path_length;
strcpy(address, packet->info.addr.un.sun_path);
}
else
{
assert(0);
}

data = packet->data;
length = packet->length;

printf("Packet[client=%s:%d, %d bytes]: data=%*s\n", address, port, length, length, data);

char resp_data[SW_BUFFER_SIZE];
int n = snprintf(resp_data, SW_BUFFER_SIZE, "Server: %*s", length, data);
char resp_data[SW_IPC_BUFFER_SIZE];
int n = sw_snprintf(resp_data, SW_IPC_BUFFER_SIZE, "Server: %*s", length, data);

//udp ipv4
if (req->info.type == SW_EVENT_UDP)
Expand All @@ -174,9 +173,7 @@ int my_onPacket(swServer *serv, swEventData *req)
//unix dgram
else if (req->info.type == SW_EVENT_UNIX_DGRAM)
{
memcpy(address, packet->data, packet->addr.un.path_length);
data = packet->data + packet->addr.un.path_length;
length = packet->length - packet->addr.un.path_length;
ret = swSocket_unix_sendto(serv_sock, address, resp_data, n);
}

if (ret < 0)
Expand Down
18 changes: 18 additions & 0 deletions examples/wrapper/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
cmake_minimum_required(VERSION 2.8)
project(server)

set(SOURCE_FILES server.cpp)
set(CMAKE_BUILD_TYPE Debug)
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR})

add_definitions(-DHAVE_CONFIG_H)

include_directories(./include ./ $ENV{SWOOLE_DIR}/ $ENV{SWOOLE_DIR}/include/ BEFORE)


link_directories($ENV{SWOOLE_DIR}/lib)
add_executable(server ${SOURCE_FILES})
target_link_libraries(server swoole)



153 changes: 153 additions & 0 deletions examples/wrapper/server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#include "wrapper/server.hpp"
#include "wrapper/timer.hpp"
#include <iostream>

using namespace std;
using namespace swoole;

class MyTimer : Timer
{
public:
MyTimer(long ms, bool interval) :
Timer(ms, interval)
{

}

MyTimer(long ms) :
Timer(ms)
{

}

protected:
virtual void callback(void);
int count = 0;
};

class MyServer : public Server
{
public:
MyServer(string _host, int _port, int _mode = SW_MODE_PROCESS, int _type = SW_SOCK_TCP) :
Server(_host, _port, _mode, _type)
{
serv.worker_num = 4;
serv.task_worker_num = 2;
}

virtual void onStart();
virtual void onShutdown() {};
virtual void onWorkerStart(int worker_id);
virtual void onWorkerStop(int worker_id) {}
virtual void onPipeMessage(int src_worker_id, const DataBuffer &) {}
virtual void onReceive(int fd, const DataBuffer &data);
virtual void onConnect(int fd);
virtual void onClose(int fd);
virtual void onPacket(const DataBuffer &data, ClientInfo &clientInfo);

virtual void onTask(int task_id, int src_worker_id, const DataBuffer &data);
virtual void onFinish(int task_id, const DataBuffer &data);

protected:
MyTimer *timer;
};

void MyServer::onReceive(int fd, const DataBuffer &data)
{
swConnection *conn = swWorker_get_connection(&this->serv, fd);
printf("onReceive: fd=%d, ip=%s|port=%d Data=%s|Len=%ld\n", fd, swConnection_get_ip(conn),
swConnection_get_port(conn), (char *) data.buffer, data.length);

int ret;
char resp_data[SW_BUFFER_SIZE_STD];
int n = snprintf(resp_data, SW_BUFFER_SIZE_STD, (char *) "Server: %*s\n", (int) data.length, (char *) data.buffer);
ret = this->send(fd, resp_data, (uint32_t) n);
if (ret < 0)
{
printf("send to client fail. errno=%d\n", errno);
}
else
{
printf("send %d bytes to client success. data=%s\n", n, resp_data);
}
DataBuffer task_data("hello world\n");
this->task(task_data);
// this->close(fd);
}

void MyServer::onPacket(const DataBuffer &data, ClientInfo &clientInfo)
{
printf("recv, length=%d, str=%s, client=%s:%d\n", data.length, (char *) data.buffer, clientInfo.address, clientInfo.port);
char resp_data[SW_BUFFER_SIZE_STD];
int n = snprintf(resp_data, SW_BUFFER_SIZE_STD, (char *) "Server: %*s\n", (int) data.length, (char *) data.buffer);
auto sent_data = DataBuffer(resp_data, n);
auto ip = string(clientInfo.address);
auto ret = this->sendto(ip, clientInfo.port, sent_data);
if (!ret)
{
printf("send to client failed. errno=%d\n", errno);
}
else
{
printf("send %d bytes to client success. data=%s\n", n, resp_data);
}
}

void MyServer::onConnect(int fd)
{
printf("PID=%d\tConnect fd=%d\n", getpid(), fd);
}

void MyServer::onClose(int fd)
{
printf("PID=%d\tClose fd=%d\n", getpid(), fd);
}

void MyServer::onTask(int task_id, int src_worker_id, const DataBuffer &data)
{
printf("PID=%d\tTaskID=%d\n", getpid(), task_id);
}

void MyServer::onFinish(int task_id, const DataBuffer &data)
{
printf("PID=%d\tClose fd=%d\n", getpid(), task_id);
}

void MyServer::onStart()
{
printf("server start\n");
}

void MyServer::onWorkerStart(int worker_id)
{
//timer = new MyTimer(1000);
}

void MyTimer::callback()
{
printf("#%d\thello world\n", count);
if (count > 9)
{
this->clear();
}
count++;
}

int main(int argc, char **argv)
{
if (argc < 2)
{
MyTimer t(1000);
event_wait();
}
else
{
MyServer server("127.0.0.1", 9501, SW_MODE_BASE);
server.listen("127.0.0.1", 9502, SW_SOCK_UDP);
server.listen("::1", 9503, SW_SOCK_TCP6);
server.listen("::1", 9504, SW_SOCK_UDP6);
server.setEvents(EVENT_onStart|EVENT_onWorkerStart | EVENT_onReceive | EVENT_onPacket| EVENT_onClose | EVENT_onTask | EVENT_onFinish);
server.start();
}
return 0;
}
177 changes: 85 additions & 92 deletions gdbinit
Original file line number Diff line number Diff line change
@@ -1,48 +1,99 @@
define timer_list
if SwooleG.timer.initialized == 1
printf "current timer number: %d, round: %d\n", SwooleG.timer.num,SwooleG.timer->round
set $running = 1
set $i = 1
while $running
if $i < SwooleG.timer->heap->num
set $tmp = SwooleG.timer->heap->nodes[$i]
set $node = (swTimer_node *)$tmp->data
if $node
printf "\t timer[%d] exec_msec:%ld round:%ld\n", $node->id, $node->exec_msec, $node->round
end
else
set $running = 0
end
set $i = $i + 1
end
else
printf "no timer\n"
end
end

define reactor_info
if SwooleG.main_reactor
printf "\t reactor id: %d\n",SwooleG.main_reactor->id
printf "\t running: %d\n", SwooleG.main_reactor->running
printf "\t event_num: %d\n", SwooleG.main_reactor->event_num
printf "\t max_event_num: %d\n", SwooleG.main_reactor->max_event_num
printf "\t check_timer: %d\n", SwooleG.main_reactor->check_timer
printf "\t timeout_msec: %d\n", SwooleG.main_reactor->timeout_msec
end
end

define sw_hash_map_list
set $hmap = $arg0
if $hmap
if $hmap->root->hh.tbl->num_items == 0
echo "no content\n"
else
set $running = 1
set $it = $hmap->iterator
if $it == 0
set $it = $hmap->root
end
while $running

set $tmp = (swHashMap_node *)$it->hh.next
if $tmp
printf "key_int[%d] key_str:%s data:%p\n", $tmp->key_int, $tmp->key_str, $tmp->data
set $it = $tmp
else
set $running = 0
end
end
end
end
end

define co_list
call swoole_coro_iterator_reset()
set $running = 1
while $running
set $co = swoole_coro_iterator_each()

if $co
printf "coroutine %d ", $co->get_cid()
printf "coroutine %ld ", $co->cid
if $co->state == 0
printf "%s\n", "SW_CORO_INIT"
printlnc $GREEN "SW_CORO_INIT"
end
if $co->state == 1
color $RED
printf "%s\n", "SW_CORO_WAITING"
color_reset
end
printlnc $YELLOW "SW_CORO_WAITING"
end
if $co->state == 2
color $GREEN
printf "%s\n", "SW_CORO_RUNNING"
color_reset
printlnc $GREEN "SW_CORO_RUNNING"
end
if $co->state == 3
printf "%s\n", "SW_CORO_END"
printlnc $CYAN "SW_CORO_END"
end
else
set $running = 0
end

set $running = 0
end
end

define co_bt
if swoole_coro_count() == 0
printf "no coroutines running\n"
printf "no coroutine is running\n"
end
____executor_globals
if $argc > 0
set $cid = (int)$arg0
else
set $cid = 'swoole::Coroutine::get_current_cid'()
end

printf "coroutine cid:[%d]\n",$cid
__co_bt $cid
end

document co_bt
dump current coroutine or the cid backtrace.
useage: co_bt [cid]
Expand All @@ -54,9 +105,7 @@ define __co_bt
if $co
set $task = (php_coro_task *)$co->task
if $task
set $backup = $eg.current_execute_data
dump_bt $task->execute_data
set $eg.current_execute_data = $backup
dump_bt $eg.current_execute_data
end
else
printf "coroutines %d not found\n", $cid
Expand Down Expand Up @@ -139,7 +188,6 @@ define dump_bt
if $arg > 0
printf ", "
end

set $zvalue = (zval *) $ex + $callFrameSize + $arg
set $type = $zvalue->u1.v.type
if $type == 1
Expand Down Expand Up @@ -194,84 +242,29 @@ define dump_bt
end
end

# __________________color functions_________________
#
set $USECOLOR = 1
# color codes
set $BLACK = 0
set $RED = 1
set $GREEN = 2
set $YELLOW = 3
set $BLUE = 4
# ======== color ========
set $BLACK = 0
set $RED = 1
set $GREEN = 2
set $YELLOW = 3
set $BLUE = 4
set $MAGENTA = 5
set $CYAN = 6
set $WHITE = 7

set $COLOR_REGNAME = $GREEN
set $COLOR_REGVAL = $BLACK
set $COLOR_REGVAL_MODIFIED = $RED
set $COLOR_SEPARATOR = $BLUE
set $COLOR_CPUFLAGS = $RED
set $CYAN = 6
set $WHITE = 7

# this is ugly but there's no else if available :-(
define color
if $USECOLOR == 1
# BLACK
if $arg0 == 0
echo \033[30m
if $argc == 0
set $arg = 0
else
# RED
if $arg0 == 1
echo \033[31m
else
# GREEN
if $arg0 == 2
echo \033[32m
else
# YELLOW
if $arg0 == 3
echo \033[33m
else
# BLUE
if $arg0 == 4
echo \033[34m
else
# MAGENTA
if $arg0 == 5
echo \033[35m
else
# CYAN
if $arg0 == 6
echo \033[36m
else
# WHITE
if $arg0 == 7
echo \033[37m
end
end
end
end
end
end
end
end
end
end

define color_reset
if $USECOLOR == 1
echo \033[0m
set $arg = $arg0 + 30
end
printf "%c[%dm", 27, $arg
end

define color_bold
if $USECOLOR == 1
echo \033[1m
end
end
# ======== print ========

define color_underline
if $USECOLOR == 1
echo \033[4m
end
define printlnc
color $arg0
printf "%s\n", $arg1
color
end
2 changes: 0 additions & 2 deletions include/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ enum swHttp_proxy_state
struct _http_proxy
{
uint8_t state;
uint8_t ssl;
int proxy_port;
char *proxy_host;
char *user;
Expand Down Expand Up @@ -188,7 +187,6 @@ int swDNSResolver_free();
typedef struct _swStream
{
swString *buffer;
uint32_t session_id;
uint8_t cancel;
void (*response)(struct _swStream *stream, char *data, uint32_t length);
swClient client;
Expand Down
83 changes: 38 additions & 45 deletions include/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,42 +140,41 @@ int swSSL_sendfile(swConnection *conn, int fd, off_t *offset, size_t size);
static sw_inline ssize_t swConnection_recv(swConnection *conn, void *__buf, size_t __n, int __flags)
{
ssize_t total_bytes = 0;
_recv:
#ifdef SW_USE_OPENSSL
if (conn->ssl)

do
{
ssize_t retval = 0;
while ((size_t) total_bytes < __n)
#ifdef SW_USE_OPENSSL
if (conn->ssl)
{
retval = swSSL_recv(conn, ((char*)__buf) + total_bytes, __n - total_bytes);
if (retval <= 0)
ssize_t retval = 0;
while ((size_t) total_bytes < __n)
{
if (total_bytes == 0)
retval = swSSL_recv(conn, ((char*)__buf) + total_bytes, __n - total_bytes);
if (retval <= 0)
{
total_bytes = retval;
if (total_bytes == 0)
{
total_bytes = retval;
}
break;
}
break;
}
else
{
total_bytes += retval;
if (!(conn->nonblock || (__flags & MSG_WAITALL)))
else
{
break;
total_bytes += retval;
if (!(conn->nonblock || (__flags & MSG_WAITALL)))
{
break;
}
}
}
}
}
else
else
#endif
{
total_bytes = recv(conn->fd, __buf, __n, __flags);
}

if (total_bytes < 0 && errno == EINTR)
{
goto _recv;
{
total_bytes = recv(conn->fd, __buf, __n, __flags);
}
}
while (total_bytes < 0 && errno == EINTR);

#ifdef SW_DEBUG
if (total_bytes > 0)
Expand All @@ -195,37 +194,31 @@ static sw_inline ssize_t swConnection_recv(swConnection *conn, void *__buf, size
static sw_inline ssize_t swConnection_send(swConnection *conn, void *__buf, size_t __n, int __flags)
{
ssize_t retval;
_send:
#ifdef SW_USE_OPENSSL
if (conn->ssl)
{
retval = swSSL_send(conn, __buf, __n);
}
else
{
retval = send(conn->fd, __buf, __n, __flags);
}
#else
retval = send(conn->fd, __buf, __n, __flags);
#endif

if (retval < 0 && errno == EINTR)
do
{
goto _send;
}
else
{
goto _return;
#ifdef SW_USE_OPENSSL
if (conn->ssl)
{
retval = swSSL_send(conn, __buf, __n);
}
else
#endif
{
retval = send(conn->fd, __buf, __n, __flags);
}
}
while (retval < 0 && errno == EINTR);

_return:
#ifdef SW_DEBUG
if (retval > 0)
{
conn->total_send_bytes += retval;
}
#endif

swTraceLog(SW_TRACE_SOCKET, "send %ld/%ld bytes, errno=%d", retval, __n, errno);

return retval;
}

Expand Down
4 changes: 0 additions & 4 deletions include/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@
#include <sys/mman.h>
#endif

#ifdef USE_VALGRIND
#include <valgrind/valgrind.h>
#endif

typedef void (*coroutine_func_t)(void*);

namespace swoole
Expand Down
6 changes: 2 additions & 4 deletions include/coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,10 @@ class Coroutine
stack_size = SW_MEM_ALIGNED_SIZE_EX(MIN(size, SW_CORO_MAX_STACK_SIZE), SW_CORO_STACK_ALIGNED_SIZE);
}

#ifdef SW_LOG_TRACE_OPEN
static inline long get_cid(Coroutine* co)
{
return co ? co->get_cid() : -1;
}
#endif

static inline long get_last_cid()
{
Expand All @@ -129,7 +127,7 @@ class Coroutine
return coroutines.size();
}

static uint64_t get_peak_num()
static inline uint64_t get_peak_num()
{
return peak_num;
}
Expand All @@ -155,7 +153,7 @@ class Coroutine
cid = ++last_cid;
coroutines[cid] = this;
call_stack[call_stack_size++] = this;
if (count() > peak_num)
if (unlikely(count() > peak_num))
{
peak_num = count();
}
Expand Down
10 changes: 9 additions & 1 deletion include/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

enum swErrorCode
{
/**
* Prevent repetition with errno [syscall error]
*/
SW_ERROR_START = 500,

/**
* common error
*/
Expand Down Expand Up @@ -110,6 +115,7 @@ enum swErrorCode
SW_ERROR_SERVER_ONLY_START_ONE,
SW_ERROR_SERVER_SEND_IN_MASTER,
SW_ERROR_SERVER_INVALID_REQUEST,
SW_ERROR_SERVER_CONNECT_FAIL,

/**
* Process exit timeout, forced to end.
Expand All @@ -134,9 +140,11 @@ enum swErrorCode
SW_ERROR_CO_PROTECT_STACK_FAILED,
SW_ERROR_CO_STD_THREAD_LINK_ERROR,
SW_ERROR_CO_DISABLED_MULTI_THREAD,

SW_ERROR_END
};

const char* swstrerror(enum swErrorCode code);
const char* swoole_strerror(enum swErrorCode code);
void swoole_throw_error(enum swErrorCode code);

#endif /* SW_ERRNO_H_ */
2 changes: 1 addition & 1 deletion include/heap.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ uint32_t swHeap_size(swHeap *heap);
swHeap_node* swHeap_push(swHeap *heap, uint64_t priority, void *data);
void *swHeap_pop(swHeap *heap);
void swHeap_change_priority(swHeap *heap, uint64_t new_priority, void* ptr);
int swHeap_remove(swHeap *heap, swHeap_node *node);
void swHeap_remove(swHeap *heap, swHeap_node *node);
void *swHeap_peek(swHeap *heap);
void swHeap_print(swHeap *heap);

Expand Down
126 changes: 67 additions & 59 deletions include/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,32 @@ extern "C" {
#define SW_REACTOR_NUM SW_CPU_NUM
#define SW_WORKER_NUM (SW_CPU_NUM*2)

#define SW_HEARTBEAT_IDLE 0 //心跳存活最大时间
#define SW_HEARTBEAT_CHECK 0 //心跳定时侦测时间

enum swEventType
enum swServer_event_type
{
//networking socket
SW_EVENT_TCP = 0,
SW_EVENT_UDP = 1,
SW_EVENT_TCP6 = 2,
SW_EVENT_UDP6 = 3,
SW_EVENT_TCP,
SW_EVENT_UDP,
SW_EVENT_TCP6,
SW_EVENT_UDP6,
//tcp event
SW_EVENT_CLOSE = 4,
SW_EVENT_CONNECT = 5,
SW_EVENT_CLOSE,
SW_EVENT_CONNECT,
//timer
SW_EVENT_TIMER = 6,
SW_EVENT_TIMER,
//task
SW_EVENT_TASK = 7,
SW_EVENT_FINISH = 8,
//package
SW_EVENT_PACKAGE_START = 9,
SW_EVENT_PACKAGE_END = 10,
SW_EVENT_PACKAGE = 11,
SW_EVENT_SENDFILE = 12,
SW_EVENT_UNIX_DGRAM = 13,
SW_EVENT_UNIX_STREAM = 14,
SW_EVENT_TASK,
SW_EVENT_FINISH,
//sendfile
SW_EVENT_SENDFILE,
//dgram
SW_EVENT_UNIX_DGRAM,
SW_EVENT_UNIX_STREAM,
//pipe
SW_EVENT_PIPE_MESSAGE = 15,
SW_EVENT_PIPE_MESSAGE,
//proxy
SW_EVENT_PROXY_START = 16,
SW_EVENT_PROXY_END = 17,
SW_EVENT_CONFIRM = 18,
SW_EVENT_PROXY_START,
SW_EVENT_PROXY_END,
SW_EVENT_CONFIRM,
//event operate
SW_EVENT_PAUSE_RECV,
SW_EVENT_RESUME_RECV,
Expand Down Expand Up @@ -114,14 +109,13 @@ enum swTaskType
SW_TASK_WAITALL = 16, //for taskWaitAll
SW_TASK_COROUTINE = 32, //coroutine
SW_TASK_PEEK = 64, //peek
SW_TASK_NOREPLY = 128, //don't reply
};

typedef struct _swReactorThread
{
pthread_t thread_id;
swReactor reactor;
swMemoryPool *buffer_input;
swLock lock;
int notify_pipe;
} swReactorThread;

Expand Down Expand Up @@ -259,9 +253,9 @@ typedef struct
//-----------------------------------Factory--------------------------------------------
typedef struct
{
long target_worker_id;
swEventData data;
} swDispatchData;
swDataHead info;
swString data;
} swPackagePtr;

struct _swFactory
{
Expand All @@ -273,7 +267,7 @@ struct _swFactory

int (*start)(struct _swFactory *);
int (*shutdown)(struct _swFactory *);
int (*dispatch)(struct _swFactory *, swDispatchData *);
int (*dispatch)(struct _swFactory *, swSendData *);
int (*finish)(struct _swFactory *, swSendData *);
int (*notify)(struct _swFactory *, swDataHead *); //send a event notify
int (*end)(struct _swFactory *, int fd);
Expand All @@ -284,12 +278,12 @@ typedef struct _swFactoryProcess
swPipe *pipes;
} swFactoryProcess;

typedef int (*swServer_dispatch_function)(swServer *, swConnection *, swEventData *);
typedef int (*swServer_dispatch_function)(swServer *, swConnection *, swSendData *);

int swFactory_create(swFactory *factory);
int swFactory_start(swFactory *factory);
int swFactory_shutdown(swFactory *factory);
int swFactory_dispatch(swFactory *factory, swDispatchData *req);
int swFactory_dispatch(swFactory *factory, swSendData *req);
int swFactory_finish(swFactory *factory, swSendData *_send);
int swFactory_notify(swFactory *factory, swDataHead *event);
int swFactory_end(swFactory *factory, int fd);
Expand Down Expand Up @@ -393,9 +387,6 @@ struct _swServer
*/
uint32_t max_request;

int signal_fd;
int event_fd;

int udp_socket_ipv4;
int udp_socket_ipv6;

Expand Down Expand Up @@ -472,7 +463,10 @@ struct _swServer
* enable coroutine
*/
uint32_t enable_coroutine :1;

/**
* disable multi-threads
*/
uint32_t single_thread :1;
/**
* heartbeat check time
*/
Expand All @@ -495,7 +489,6 @@ struct _swServer
void *ptr2;
void *private_data_3;

swReactor reactor;
swFactory factory;
swListenPort *listen_list;
pthread_t heartbeat_pidt;
Expand Down Expand Up @@ -536,29 +529,26 @@ struct _swServer
* temporary directory for HTTP uploaded file.
*/
char *upload_tmp_dir;

/**
* http compression level for gzip/br
*/
uint8_t http_compression_level;

/**
* http static file directory
*/
char *document_root;
uint16_t document_root_len;

/**
* master process pid
*/
char *pid_file;

/**
* stream
*/
char *stream_socket;
int stream_fd;
swProtocol stream_protocol;
int last_stream_fd;
swLinkedList *buffer_pool;

#ifdef SW_BUFFER_RECV_TIME
Expand All @@ -571,7 +561,6 @@ struct _swServer
* message queue key
*/
uint64_t message_queue_key;

/**
* slow request log
*/
Expand Down Expand Up @@ -607,12 +596,17 @@ struct _swServer
*/
int (*onTask)(swServer *serv, swEventData *data);
int (*onFinish)(swServer *serv, swEventData *data);

int (*send)(swServer *serv, int fd, void *data, uint32_t length);
int (*sendfile)(swServer *serv, int fd, char *filename, uint32_t filename_length, off_t offset, size_t length);
int (*sendwait)(swServer *serv, int fd, void *data, uint32_t length);
int (*close)(swServer *serv, int fd, int reset);
int (*dispatch_func)(swServer *, swConnection *, swEventData *);
/**
* Server method
*/
int (*send)(swServer *serv, int session_id, void *data, uint32_t length);
int (*sendfile)(swServer *serv, int session_id, char *file, uint32_t l_file, off_t offset, size_t length);
int (*sendwait)(swServer *serv, int session_id, void *data, uint32_t length);
int (*close)(swServer *serv, int session_id, int reset);
int (*notify)(swServer *serv, swConnection *conn, int event);
int (*feedback)(swServer *serv, int session_id, int event);

int (*dispatch_func)(swServer *, swConnection *, swSendData *);
};

typedef struct
Expand Down Expand Up @@ -681,12 +675,6 @@ static sw_inline swListenPort* swServer_get_port(swServer *serv, int fd)
}

int swServer_udp_send(swServer *serv, swSendData *resp);
int swServer_tcp_send(swServer *serv, int fd, void *data, uint32_t length);
int swServer_tcp_sendwait(swServer *serv, int fd, void *data, uint32_t length);
int swServer_tcp_close(swServer *serv, int fd, int reset);
int swServer_tcp_sendfile(swServer *serv, int session_id, char *filename, uint32_t filename_length, off_t offset, size_t length);
int swServer_tcp_notify(swServer *serv, swConnection *conn, int event);
int swServer_tcp_feedback(swServer *serv, int fd, int event);

#define SW_MAX_SESSION_ID 0x1000000

Expand All @@ -710,9 +698,6 @@ static sw_inline int swEventData_is_stream(uint8_t type)
case SW_EVENT_TCP:
case SW_EVENT_TCP6:
case SW_EVENT_UNIX_STREAM:
case SW_EVENT_PACKAGE_START:
case SW_EVENT_PACKAGE:
case SW_EVENT_PACKAGE_END:
case SW_EVENT_CONNECT:
case SW_EVENT_CLOSE:
case SW_EVENT_PAUSE_RECV:
Expand All @@ -729,11 +714,11 @@ swPipe * swServer_get_pipe_object(swServer *serv, int pipe_fd);
void swServer_store_pipe_fd(swServer *serv, swPipe *p);
void swServer_store_listen_socket(swServer *serv);

int swServer_get_manager_pid(swServer *serv);
int swServer_get_socket(swServer *serv, int port);
int swServer_worker_create(swServer *serv, swWorker *worker);
int swServer_worker_init(swServer *serv, swWorker *worker);
void swServer_worker_start(swServer *serv, swWorker *worker);

swString** swServer_create_worker_buffer(swServer *serv);
int swServer_create_task_worker(swServer *serv);
void swServer_enable_accept(swReactor *reactor);
Expand Down Expand Up @@ -842,7 +827,7 @@ static sw_inline swWorker* swServer_get_worker(swServer *serv, uint16_t worker_i
return NULL;
}

static sw_inline int swServer_worker_schedule(swServer *serv, int fd, swEventData *data)
static sw_inline int swServer_worker_schedule(swServer *serv, int fd, swSendData *data)
{
uint32_t key;

Expand Down Expand Up @@ -948,6 +933,29 @@ static sw_inline swString *swWorker_get_buffer(swServer *serv, int reactor_id)
}
}

static sw_inline size_t swWorker_get_data(swEventData *req, char **data_ptr)
{
size_t length;
if (req->info.flags & SW_EVENT_DATA_PTR)
{
swPackagePtr *task = (swPackagePtr *) req;
*data_ptr = task->data.str;
length = task->data.length;
}
else if (req->info.flags & SW_EVENT_DATA_END)
{
swString *worker_buffer = swWorker_get_buffer(SwooleG.serv, req->info.from_id);
*data_ptr = worker_buffer->str;
length = worker_buffer->length;
}
else
{
*data_ptr = req->data;
length = req->info.len;
}
return length;
}

static sw_inline swConnection *swServer_connection_verify_no_ssl(swServer *serv, uint32_t session_id)
{
swSession *session = swServer_get_session(serv, session_id);
Expand Down
Loading