Skip to content

Commit

Permalink
Persistent connections pool
Browse files Browse the repository at this point in the history
  • Loading branch information
yatsukhnenko committed Feb 20, 2019
1 parent 8cd165d commit a370382
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 23 deletions.
24 changes: 22 additions & 2 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,22 @@ zend_string_realloc(zend_string *s, size_t len, int persistent)
return zstr;
}

#define strpprintf zend_strpprintf

static zend_string *
zend_strpprintf(size_t max_len, const char *format, ...)
{
va_list ap;
zend_string *zstr;

va_start(ap, format);
zstr = ecalloc(1, sizeof(*zstr));
ZSTR_LEN(zstr) = vspprintf(&ZSTR_VAL(zstr), max_len, format, ap);
zstr->gc = 0x11;
va_end(ap);
return zstr;
}

#define zend_string_copy(s) zend_string_init(ZSTR_VAL(s), ZSTR_LEN(s), 0)

#define zend_string_equal_val(s1, s2) !memcmp(ZSTR_VAL(s1), ZSTR_VAL(s2), ZSTR_LEN(s1))
Expand Down Expand Up @@ -140,6 +156,8 @@ zend_hash_str_find(const HashTable *ht, const char *key, size_t len)
return NULL;
}

#define zend_hash_find_ptr(ht, s) zend_hash_str_find_ptr(ht, ZSTR_VAL(s), ZSTR_LEN(s))

static zend_always_inline void *
zend_hash_str_find_ptr(const HashTable *ht, const char *str, size_t len)
{
Expand All @@ -151,10 +169,12 @@ zend_hash_str_find_ptr(const HashTable *ht, const char *str, size_t len)
return NULL;
}

#define zend_hash_str_update_ptr(ht, str, len, pData) zend_hash_str_update_mem(ht, str, len, pData, sizeof(void *))

static zend_always_inline void *
zend_hash_str_update_ptr(HashTable *ht, const char *str, size_t len, void *pData)
zend_hash_str_update_mem(HashTable *ht, const char *str, size_t len, void *pData, size_t size)
{
if (zend_hash_update(ht, str, len + 1, (void *)&pData, sizeof(void *), NULL) == SUCCESS) {
if (zend_hash_update(ht, str, len + 1, (void *)&pData, size, NULL) == SUCCESS) {
return pData;
}
return NULL;
Expand Down
80 changes: 59 additions & 21 deletions library.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
extern zend_class_entry *redis_ce;
extern zend_class_entry *redis_exception_ce;

extern int le_redis_pconnect;

/* Helper to reselect the proper DB number when we reconnect */
static int reselect_db(RedisSock *redis_sock TSRMLS_DC) {
char *cmd, *response;
Expand Down Expand Up @@ -1743,10 +1745,10 @@ redis_sock_create(char *host, int host_len, unsigned short port,
PHP_REDIS_API int redis_sock_connect(RedisSock *redis_sock TSRMLS_DC)
{
struct timeval tv, read_tv, *tv_ptr = NULL;
char host[1024], *persistent_id = NULL;
zend_string *persistent_id = NULL;
char host[1024];
const char *fmtstr = "%s:%d";
int host_len, usocket = 0, err = 0;
php_netstream_data_t *sock;
int tcp_flag = 1;
#if (PHP_MAJOR_VERSION < 7)
char *estr = NULL;
Expand All @@ -1758,15 +1760,6 @@ PHP_REDIS_API int redis_sock_connect(RedisSock *redis_sock TSRMLS_DC)
redis_sock_disconnect(redis_sock, 0 TSRMLS_CC);
}

tv.tv_sec = (time_t)redis_sock->timeout;
tv.tv_usec = (int)((redis_sock->timeout - tv.tv_sec) * 1000000);
if(tv.tv_sec != 0 || tv.tv_usec != 0) {
tv_ptr = &tv;
}

read_tv.tv_sec = (time_t)redis_sock->read_timeout;
read_tv.tv_usec = (int)((redis_sock->read_timeout-read_tv.tv_sec)*1000000);

if (ZSTR_VAL(redis_sock->host)[0] == '/' && redis_sock->port < 1) {
host_len = snprintf(host, sizeof(host), "unix://%s", ZSTR_VAL(redis_sock->host));
usocket = 1;
Expand All @@ -1785,21 +1778,46 @@ PHP_REDIS_API int redis_sock_connect(RedisSock *redis_sock TSRMLS_DC)
}

if (redis_sock->persistent) {
if (redis_sock->persistent_id) {
spprintf(&persistent_id, 0, "phpredis:%s:%s", host,
ZSTR_VAL(redis_sock->persistent_id));
if (INI_INT("redis.pconnect.pooling_enabled")) {
persistent_id = strpprintf(0, "phpredis_%s:%d", ZSTR_VAL(redis_sock->host), redis_sock->port);
zend_resource *le = zend_hash_find_ptr(&EG(persistent_list), persistent_id);
if (le && zend_llist_count(le->ptr) > 0) {
redis_sock->stream = *(php_stream **)zend_llist_get_last(le->ptr);
zend_llist_remove_tail(le->ptr);
/* Check socket liveness using 0 second timeout */
if (php_stream_set_option(redis_sock->stream, PHP_STREAM_OPTION_CHECK_LIVENESS, 0, NULL) == PHP_STREAM_OPTION_RETURN_OK) {
redis_sock->status = REDIS_SOCK_STATUS_CONNECTED;
zend_string_release(persistent_id);
return SUCCESS;
}
php_stream_pclose(redis_sock->stream);
}
zend_string_release(persistent_id);

gettimeofday(&tv, NULL);
persistent_id = strpprintf(0, "phpredis_%d%d", tv.tv_sec, tv.tv_usec);
} else {
spprintf(&persistent_id, 0, "phpredis:%s:%f", host,
redis_sock->timeout);
if (redis_sock->persistent_id) {
persistent_id = strpprintf(0, "phpredis:%s:%s", host, ZSTR_VAL(redis_sock->persistent_id));
} else {
persistent_id = strpprintf(0, "phpredis:%s:%f", host, redis_sock->timeout);
}
}
}

tv.tv_sec = (time_t)redis_sock->timeout;
tv.tv_usec = (int)((redis_sock->timeout - tv.tv_sec) * 1000000);
if (tv.tv_sec != 0 || tv.tv_usec != 0) {
tv_ptr = &tv;
}

redis_sock->stream = php_stream_xport_create(host, host_len,
0, STREAM_XPORT_CLIENT | STREAM_XPORT_CONNECT,
persistent_id, tv_ptr, NULL, &estr, &err);
persistent_id ? ZSTR_VAL(persistent_id) : NULL,
tv_ptr, NULL, &estr, &err);

if (persistent_id) {
efree(persistent_id);
zend_string_release(persistent_id);
}

if (!redis_sock->stream) {
Expand All @@ -1812,12 +1830,12 @@ PHP_REDIS_API int redis_sock_connect(RedisSock *redis_sock TSRMLS_DC)
zend_string_release(estr);
#endif
}
return -1;
return FAILURE;
}

/* Attempt to set TCP_NODELAY/TCP_KEEPALIVE if we're not using a unix socket. */
sock = (php_netstream_data_t*)redis_sock->stream->abstract;
if (!usocket) {
php_netstream_data_t *sock = (php_netstream_data_t*)redis_sock->stream->abstract;
err = setsockopt(sock->socket, IPPROTO_TCP, TCP_NODELAY, (char*) &tcp_flag, sizeof(tcp_flag));
PHPREDIS_NOTUSED(err);
err = setsockopt(sock->socket, SOL_SOCKET, SO_KEEPALIVE, (char*) &redis_sock->tcp_keepalive, sizeof(redis_sock->tcp_keepalive));
Expand All @@ -1826,6 +1844,9 @@ PHP_REDIS_API int redis_sock_connect(RedisSock *redis_sock TSRMLS_DC)

php_stream_auto_cleanup(redis_sock->stream);

read_tv.tv_sec = (time_t)redis_sock->read_timeout;
read_tv.tv_usec = (int)((redis_sock->read_timeout - read_tv.tv_sec) * 1000000);

if (read_tv.tv_sec != 0 || read_tv.tv_usec != 0) {
php_stream_set_option(redis_sock->stream,PHP_STREAM_OPTION_READ_TIMEOUT,
0, &read_tv);
Expand All @@ -1835,7 +1856,7 @@ PHP_REDIS_API int redis_sock_connect(RedisSock *redis_sock TSRMLS_DC)

redis_sock->status = REDIS_SOCK_STATUS_CONNECTED;

return 0;
return SUCCESS;
}

/**
Expand Down Expand Up @@ -1869,6 +1890,23 @@ redis_sock_disconnect(RedisSock *redis_sock, int force TSRMLS_DC)
if (redis_sock->persistent) {
if (force) {
php_stream_pclose(redis_sock->stream);
} else if (INI_INT("redis.pconnect.pooling_enabled")) {
zend_string *persistent_id = strpprintf(0, "phpredis_%s:%d", ZSTR_VAL(redis_sock->host), redis_sock->port);
zend_resource *le = zend_hash_find_ptr(&EG(persistent_list), persistent_id);
if (!le) {
#if (PHP_MAJOR_VERSION < 7)
le = ecalloc(1, sizeof(*le));
#else
zend_resource res;
le = &res;
#endif
le->type = le_redis_pconnect;
le->ptr = pecalloc(1, sizeof(zend_llist), 1);
zend_llist_init(le->ptr, sizeof(void *), NULL, 1);
zend_hash_str_update_mem(&EG(persistent_list), ZSTR_VAL(persistent_id), ZSTR_LEN(persistent_id), le, sizeof(*le));
}
zend_llist_prepend_element(le->ptr, &redis_sock->stream);
zend_string_release(persistent_id);
}
} else {
php_stream_close(redis_sock->stream);
Expand Down
28 changes: 28 additions & 0 deletions redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ zend_class_entry *redis_exception_ce;
extern zend_function_entry redis_array_functions[];
extern zend_function_entry redis_cluster_functions[];

int le_redis_pconnect;

PHP_INI_BEGIN()
/* redis arrays */
PHP_INI_ENTRY("redis.arrays.algorithm", "", PHP_INI_ALL, NULL)
Expand All @@ -77,6 +79,9 @@ PHP_INI_BEGIN()
PHP_INI_ENTRY("redis.clusters.seeds", "", PHP_INI_ALL, NULL)
PHP_INI_ENTRY("redis.clusters.timeout", "0", PHP_INI_ALL, NULL)

/* redis pconnect */
PHP_INI_ENTRY("redis.pconnect.pooling_enabled", "0", PHP_INI_ALL, NULL)

/* redis session */
PHP_INI_ENTRY("redis.session.locking_enabled", "0", PHP_INI_ALL, NULL)
PHP_INI_ENTRY("redis.session.lock_expire", "0", PHP_INI_ALL, NULL)
Expand Down Expand Up @@ -742,6 +747,25 @@ static void add_class_constants(zend_class_entry *ce, int is_cluster TSRMLS_DC)
zend_declare_class_constant_stringl(ce, "BEFORE", 6, "before", 6 TSRMLS_CC);
}

static void
redis_pconnect_dtor(void *ptr TSRMLS_DC)
{
php_stream_pclose(*(php_stream **)ptr);
}

static ZEND_RSRC_DTOR_FUNC(redis_connections_pool_dtor)
{
#if (PHP_MAJOR_VERSION < 7)
zend_resource *res = rsrc;
#endif

if (res->ptr) {
zend_llist_apply(res->ptr, redis_pconnect_dtor TSRMLS_CC);
zend_llist_destroy(res->ptr);
pefree(res->ptr, 1);
}
}

/**
* PHP_MINIT_FUNCTION
*/
Expand Down Expand Up @@ -823,6 +847,10 @@ PHP_MINIT_FUNCTION(redis)
php_session_register_module(&ps_mod_redis_cluster);
#endif

/* Register resource destructors */
le_redis_pconnect = zend_register_list_destructors_ex(NULL, redis_connections_pool_dtor,
"phpredis persistent connections pool", module_number);

return SUCCESS;
}

Expand Down

0 comments on commit a370382

Please sign in to comment.