Skip to content

Commit

Permalink
Fix RedisArray::_rehash to support closures
Browse files Browse the repository at this point in the history
* Add "f" parameter in _rehash()
* Call object with new method
* Add rehash test with closure
  • Loading branch information
nicolasff committed Sep 9, 2012
1 parent 5855cfc commit 6dc3ad6
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 20 deletions.
2 changes: 1 addition & 1 deletion arrays.markdown
Expand Up @@ -73,7 +73,7 @@ For instance, the keys “{user:1}:name” and “{user:1}:email” will be stor

## Migrating keys

When a node is added or removed from a ring, RedisArray instances must be instanciated with a “previous” list of nodes. A single call to `$ra->_rehash()` causes all the keys to be redistributed according to the new list of nodes. Passing a callback function to `_rehash()` makes it possible to track the progress of that operation: the function is called with a node name and a number of keys that will be examined.
When a node is added or removed from a ring, RedisArray instances must be instanciated with a “previous” list of nodes. A single call to `$ra->_rehash()` causes all the keys to be redistributed according to the new list of nodes. Passing a callback function to `_rehash()` makes it possible to track the progress of that operation: the function is called with a node name and a number of keys that will be examined, e.g. `_rehash(function ($host, $count){ ... });`.

It is possible to automate this process, by setting `'autorehash' => TRUE` in the constructor options. This will cause keys to be migrated when they need to be read from the previous array.

Expand Down
14 changes: 10 additions & 4 deletions redis_array.c
Expand Up @@ -499,19 +499,25 @@ PHP_METHOD(RedisArray, _distributor)

PHP_METHOD(RedisArray, _rehash)
{
zval *object, *z_cb = NULL;
zval *object;
RedisArray *ra;
zend_fcall_info z_cb;
zend_fcall_info_cache z_cb_cache;

if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O|z",
&object, redis_array_ce, &z_cb) == FAILURE) {
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O|f",
&object, redis_array_ce, &z_cb, &z_cb_cache) == FAILURE) {
RETURN_FALSE;
}

if (redis_array_get(object, &ra TSRMLS_CC) < 0) {
RETURN_FALSE;
}

ra_rehash(ra, z_cb TSRMLS_CC);
if (ZEND_NUM_ARGS() == 0) {
ra_rehash(ra, NULL, NULL TSRMLS_CC);
} else {
ra_rehash(ra, &z_cb, &z_cb_cache TSRMLS_CC);
}
}

static void multihost_distribute(INTERNAL_FUNCTION_PARAMETERS, const char *method_name)
Expand Down
41 changes: 27 additions & 14 deletions redis_array_impl.c
Expand Up @@ -1105,24 +1105,37 @@ ra_move_key(const char *key, int key_len, zval *z_from, zval *z_to TSRMLS_DC) {
}

/* callback with the current progress, with hostname and count */
static void zval_rehash_callback(zval *z_cb, const char *hostname, long count TSRMLS_DC) {
static void zval_rehash_callback(zend_fcall_info *z_cb, zend_fcall_info_cache *z_cb_cache,
const char *hostname, long count TSRMLS_DC) {

zval z_ret, *z_args[2];
zval *z_ret = NULL, **z_args[2];
zval *z_host, *z_count;

z_cb->retval_ptr_ptr = &z_ret;
z_cb->params = &z_args;
z_cb->param_count = 2;
z_cb->no_separation = 0;

/* run cb(hostname, count) */
MAKE_STD_ZVAL(z_args[0]);
ZVAL_STRING(z_args[0], hostname, 0);
MAKE_STD_ZVAL(z_args[1]);
ZVAL_LONG(z_args[1], count);
call_user_function(EG(function_table), NULL, z_cb, &z_ret, 2, z_args TSRMLS_CC);
MAKE_STD_ZVAL(z_host);
ZVAL_STRING(z_host, hostname, 0);
z_args[0] = &z_host;
MAKE_STD_ZVAL(z_count);
ZVAL_LONG(z_count, count);
z_args[1] = &z_count;

zend_call_function(z_cb, z_cb_cache TSRMLS_CC);

/* cleanup */
efree(z_args[0]);
efree(z_args[1]);
efree(z_host);
efree(z_count);
if(z_ret)
efree(z_ret);
}

static void
ra_rehash_server(RedisArray *ra, zval *z_redis, const char *hostname, zend_bool b_index, zval *z_cb TSRMLS_DC) {
ra_rehash_server(RedisArray *ra, zval *z_redis, const char *hostname, zend_bool b_index,
zend_fcall_info *z_cb, zend_fcall_info_cache *z_cb_cache TSRMLS_DC) {

char **keys;
int *key_lens;
Expand All @@ -1138,8 +1151,8 @@ ra_rehash_server(RedisArray *ra, zval *z_redis, const char *hostname, zend_bool
}

/* callback */
if(z_cb) {
zval_rehash_callback(z_cb, hostname, count TSRMLS_CC);
if(z_cb && z_cb_cache) {
zval_rehash_callback(z_cb, z_cb_cache, hostname, count TSRMLS_CC);
}

/* for each key, redistribute */
Expand All @@ -1163,7 +1176,7 @@ ra_rehash_server(RedisArray *ra, zval *z_redis, const char *hostname, zend_bool
}

void
ra_rehash(RedisArray *ra, zval *z_cb TSRMLS_DC) {
ra_rehash(RedisArray *ra, zend_fcall_info *z_cb, zend_fcall_info_cache *z_cb_cache TSRMLS_DC) {

int i;

Expand All @@ -1172,7 +1185,7 @@ ra_rehash(RedisArray *ra, zval *z_cb TSRMLS_DC) {
return; /* TODO: compare the two rings for equality */

for(i = 0; i < ra->prev->count; ++i) {
ra_rehash_server(ra, ra->prev->redis[i], ra->prev->hosts[i], ra->index, z_cb TSRMLS_CC);
ra_rehash_server(ra, ra->prev->redis[i], ra->prev->hosts[i], ra->index, z_cb, z_cb_cache TSRMLS_CC);
}
}

2 changes: 1 addition & 1 deletion redis_array_impl.h
Expand Up @@ -24,6 +24,6 @@ void ra_index_discard(zval *z_redis, zval *return_value TSRMLS_DC);
void ra_index_unwatch(zval *z_redis, zval *return_value TSRMLS_DC);
zend_bool ra_is_write_cmd(RedisArray *ra, const char *cmd, int cmd_len);

void ra_rehash(RedisArray *ra, zval *z_cb TSRMLS_DC);
void ra_rehash(RedisArray *ra, zend_fcall_info *z_cb, zend_fcall_info_cache *z_cb_cache TSRMLS_DC);

#endif
8 changes: 8 additions & 0 deletions tests/array-tests.php
Expand Up @@ -289,6 +289,14 @@ public function testRehash() {
$this->ra->_rehash(); // this will redistribute the keys
}

public function testRehashWithCallback() {
$total = 0;
$this->ra->_rehash(function ($host, $count) use (&$total) {
$total += $count;
});
$this->assertTrue($total > 0);
}

public function testReadRedistributedKeys() {
$this->readAllvalues(); // we shouldn't have any missed reads now.
}
Expand Down

0 comments on commit 6dc3ad6

Please sign in to comment.