Skip to content

Commit

Permalink
Added connection callback
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikko Koppanen committed Sep 19, 2010
1 parent f7ab1d4 commit 9586232
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 12 deletions.
2 changes: 1 addition & 1 deletion tests/006-sockopt.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var_dump(strlen($test->getSockOpt(ZMQ::SOCKOPT_IDENTITY)));
try {
$test->setSockOpt(ZMQ::SOCKOPT_IDENTITY, str_repeat("a", 300));
var_dump(strlen($test->getSockOpt(ZMQ::SOCKOPT_IDENTITY)));
} catch (ZMQException $e) {
} catch (ZMQSocketException $e) {
echo "too long";
}

Expand Down
38 changes: 38 additions & 0 deletions tests/015-callback.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
--TEST--
Test basic callback usage
--SKIPIF--
<?php require_once(dirname(__FILE__) . '/skipif.inc'); ?>
--FILE--
<?php

include dirname(__FILE__) . '/zeromq_test_helper.inc';

$socket = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, 'my persistent 1', 'bind_callback');
var_dump($socket->getEndpoints());

$ctx = new ZMQContext();
$socket = $ctx->getSocket(ZMQ::SOCKET_REQ, 'my persistent 2', 'bind_callback');
var_dump($socket->getEndpoints());


--EXPECT--
array(2) {
["connect"]=>
array(0) {
}
["bind"]=>
array(1) {
[0]=>
string(20) "tcp://127.0.0.1:5566"
}
}
array(2) {
["connect"]=>
array(0) {
}
["bind"]=>
array(1) {
[0]=>
string(20) "tcp://127.0.0.1:5567"
}
}
25 changes: 25 additions & 0 deletions tests/016-callbackinvalidargs.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
--TEST--
Test invalid args for callback
--SKIPIF--
<?php require_once(dirname(__FILE__) . '/skipif.inc'); ?>
--FILE--
<?php

try {
$socket = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, 'my persistent 1', 'this_function_does_not_exist');
echo "Fail\n";
} catch (ZMQSocketException $e) {
echo "OK\n";
}

try {
$ctx = new ZMQContext();
$socket = $ctx->getSocket(ZMQ::SOCKET_REQ, 'my persistent 2', 'this_function_does_not_exist');
echo "Fail\n";
} catch (ZMQSocketException $e) {
echo "OK\n";
}

--EXPECT--
OK
OK
23 changes: 23 additions & 0 deletions tests/017-callbackonlyonnewsocket.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
--TEST--
Test that callback is only called on new socket
--SKIPIF--
<?php require_once(dirname(__FILE__) . '/skipif.inc'); ?>
--FILE--
<?php

include dirname(__FILE__) . '/zeromq_test_helper.inc';

$socket = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, 'persistent_socket', 'bind_callback');
$socket = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, 'persistent_socket', 'bind_callback');
$socket = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, 'persistent_socket', 'bind_callback');
$socket = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, 'persistent_socket', 'bind_callback');
$socket = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, 'persistent_socket', 'bind_callback');

$array = $socket->getEndpoints();

echo count($array['bind']) . "\n";
echo "OK";

--EXPECT--
1
OK
25 changes: 25 additions & 0 deletions tests/018-callbackpersistent.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
--TEST--
Test callback arguments on persistent and non-persistent socket
--SKIPIF--
<?php require_once(dirname(__FILE__) . '/skipif.inc'); ?>
--FILE--
<?php

function dump_args(ZMQSocket $s, $p = null)
{
var_dump($s, $p);
}

$socket = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, 'persistent_socket', 'dump_args');
$socket = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, null, 'dump_args');

echo "OK";

--EXPECTF--
object(ZMQSocket)#%d (0) {
}
string(17) "persistent_socket"
object(ZMQSocket)#%d (0) {
}
NULL
OK
36 changes: 36 additions & 0 deletions tests/019-callbackinvalidsignature.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
--TEST--
Test callback edge-cases
--SKIPIF--
--XFAIL--
<?php require_once(dirname(__FILE__) . '/skipif.inc'); ?>
--FILE--
<?php

function try_to_force_ref(&$a, $b)
{
echo "CALLED\n";
}

function throw_exception($socket, $pid = null)
{
/* This exception should bubble up */
throw new Exception("hello world");
}

try {
$socket = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, 'persistent_socket', 'try_to_force_ref');
echo "Fail\n";
} catch (ZMQSocketException $e) {
echo $e->getMessage() . "\n";
}

try {
$socket = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, 'persistent_socket', 'throw_exception');
echo "Fail\n";
} catch (ZMQSocketException $e) {
echo $e->getMessage() . "\n";
}

--EXPECTF--
Parameter 1 to try_to_force_ref() expected to be a reference, value given
hello world
8 changes: 8 additions & 0 deletions tests/zeromq_test_helper.inc
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@ function create_client($persistent_id = null)
->connect(ZEROMQ_TEST_DSN);

return $client;
}

function bind_callback(ZMQSocket $socket, $persistent_id = null)
{
static $port = 5566;

$socket->bind("tcp://127.0.0.1:{$port}");
$port++;
}
114 changes: 103 additions & 11 deletions zmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ PHP_METHOD(zmqcontext, __construct)
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|lb", &io_threads, &is_persistent) == FAILURE) {
return;
}


intern = PHP_ZMQ_CONTEXT_OBJECT;
intern->context = php_zmq_context_get(io_threads, is_persistent TSRMLS_CC);

Expand Down Expand Up @@ -210,10 +208,10 @@ static php_zmq_socket *php_zmq_socket_new(php_zmq_context *context, int type, ze
}
/* }}} */

/* {{{ static php_zmq_socket *php_zmq_socket_get(php_zmq_context *context, int type, const char *persistent_id, int persistent_id_len TSRMLS_DC)
/* {{{ static php_zmq_socket *php_zmq_socket_get(php_zmq_context *context, int type, const char *persistent_id, zend_bool *is_new TSRMLS_DC)
Tries to get context from plist and allocates a new context if context does not exist
*/
static php_zmq_socket *php_zmq_socket_get(php_zmq_context *context, int type, const char *persistent_id TSRMLS_DC)
static php_zmq_socket *php_zmq_socket_get(php_zmq_context *context, int type, const char *persistent_id, zend_bool *is_new TSRMLS_DC)
{
php_zmq_socket *zmq_sock_p;
zend_bool is_persistent;
Expand All @@ -222,6 +220,7 @@ static php_zmq_socket *php_zmq_socket_get(php_zmq_context *context, int type, co
int plist_key_len = 0;

is_persistent = (context->is_persistent && persistent_id) ? 1 : 0;
*is_new = 0;

if (is_persistent) {
zend_rsrc_list_entry *le = NULL;
Expand Down Expand Up @@ -256,11 +255,58 @@ static php_zmq_socket *php_zmq_socket_get(php_zmq_context *context, int type, co
}
efree(plist_key);
}
*is_new = 1;
return zmq_sock_p;
}
/* }}} */

/* {{{ proto ZMQContext ZMQContext::getSocket(integer $type[, string $persistent_id = null])
static void php_zmq_connect_callback(zval *socket, zend_fcall_info *fci, zend_fcall_info_cache *fci_cache, char *persistent_id, int persistent_id_len)
{
zval *retval_ptr, *pid_z;
zval **params[2];
#if ZEND_MODULE_API_NO > 20060613
zend_error_handling error_handling;
zend_replace_error_handling(EH_THROW, php_zmq_socket_exception_sc_entry, &error_handling TSRMLS_CC);
#else
php_set_error_handling(EH_THROW, php_zmq_socket_exception_sc_entry TSRMLS_CC);
#endif

ALLOC_INIT_ZVAL(pid_z);

if (persistent_id) {
ZVAL_STRING(pid_z, persistent_id, 1);
} else {
ZVAL_NULL(pid_z);
}

/* Call the cb */
params[0] = &socket;
params[1] = &pid_z;

fci->params = params;
fci->param_count = 2;
fci->retval_ptr_ptr = &retval_ptr;
fci->no_separation = 1;

if (zend_call_function(fci, fci_cache TSRMLS_CC) == FAILURE || EG(exception)) {
if (!EG(exception)) {
zend_throw_exception_ex(php_zmq_socket_exception_sc_entry, 0 TSRMLS_CC, "Failed to invoke callback %s()", Z_STRVAL_P(fci->function_name));
}
}
zval_ptr_dtor(&pid_z);

if (retval_ptr) {
zval_ptr_dtor(&retval_ptr);
}

#if ZEND_MODULE_API_NO > 20060613
zend_restore_error_handling(&error_handling TSRMLS_CC);
#else
php_set_error_handling(EH_NORMAL, NULL TSRMLS_CC);
#endif
}

/* {{{ proto ZMQContext ZMQContext::getSocket(integer $type[, string $persistent_id = null, callback $on_new_socket = null])
Build a new ZMQContext object
*/
PHP_METHOD(zmqcontext, getsocket)
Expand All @@ -270,14 +316,33 @@ PHP_METHOD(zmqcontext, getsocket)
php_zmq_context_object *intern;
long type;
char *persistent_id = NULL;
int persistent_id_len;
int rc, persistent_id_len;
zend_bool is_new;

if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|s!", &type, &persistent_id, &persistent_id_len) == FAILURE) {
zend_fcall_info fci;
zend_fcall_info_cache fci_cache;

#if ZEND_MODULE_API_NO > 20060613
zend_error_handling error_handling;
zend_replace_error_handling(EH_THROW, php_zmq_socket_exception_sc_entry, &error_handling TSRMLS_CC);
#else
php_set_error_handling(EH_THROW, php_zmq_socket_exception_sc_entry TSRMLS_CC);
#endif

rc = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|s!f", &type, &persistent_id, &persistent_id_len, &fci, &fci_cache);

#if ZEND_MODULE_API_NO > 20060613
zend_restore_error_handling(&error_handling TSRMLS_CC);
#else
php_set_error_handling(EH_NORMAL, NULL TSRMLS_CC);
#endif

if (rc == FAILURE) {
return;
}

intern = PHP_ZMQ_CONTEXT_OBJECT;
socket = php_zmq_socket_get(intern->context, type, persistent_id TSRMLS_CC);
socket = php_zmq_socket_get(intern->context, type, persistent_id, &is_new TSRMLS_CC);

if (!socket) {
zend_throw_exception_ex(php_zmq_socket_exception_sc_entry, errno TSRMLS_CC, "Error creating socket: %s", zmq_strerror(errno));
Expand All @@ -297,6 +362,10 @@ PHP_METHOD(zmqcontext, getsocket)
zend_objects_store_add_ref(getThis() TSRMLS_CC);
interns->context_obj = getThis();
}

if (is_new && ZEND_NUM_ARGS() > 2) {
php_zmq_connect_callback(return_value, &fci, &fci_cache, persistent_id, persistent_id_len);
}
return;
}
/* }}} */
Expand Down Expand Up @@ -331,15 +400,34 @@ PHP_METHOD(zmqsocket, __construct)
php_zmq_context_object *internc;
long type;
char *persistent_id = NULL;
int persistent_id_len;
int rc, persistent_id_len;
zval *obj;
zend_bool is_new;

if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Ol|s!", &obj, php_zmq_context_sc_entry, &type, &persistent_id, &persistent_id_len) == FAILURE) {
zend_fcall_info fci;
zend_fcall_info_cache fci_cache;

#if ZEND_MODULE_API_NO > 20060613
zend_error_handling error_handling;
zend_replace_error_handling(EH_THROW, php_zmq_socket_exception_sc_entry, &error_handling TSRMLS_CC);
#else
php_set_error_handling(EH_THROW, php_zmq_socket_exception_sc_entry TSRMLS_CC);
#endif

rc = zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Ol|s!f", &obj, php_zmq_context_sc_entry, &type, &persistent_id, &persistent_id_len, &fci, &fci_cache);

#if ZEND_MODULE_API_NO > 20060613
zend_restore_error_handling(&error_handling TSRMLS_CC);
#else
php_set_error_handling(EH_NORMAL, NULL TSRMLS_CC);
#endif

if (rc == FAILURE) {
return;
}

internc = (php_zmq_context_object *) zend_object_store_get_object(obj TSRMLS_CC);
socket = php_zmq_socket_get(internc->context, type, persistent_id TSRMLS_CC);
socket = php_zmq_socket_get(internc->context, type, persistent_id, &is_new TSRMLS_CC);

if (!socket) {
zend_throw_exception_ex(php_zmq_socket_exception_sc_entry, errno TSRMLS_CC, "Error creating socket: %s", zmq_strerror(errno));
Expand All @@ -358,6 +446,10 @@ PHP_METHOD(zmqsocket, __construct)
zend_objects_store_add_ref(obj TSRMLS_CC);
intern->context_obj = obj;
}

if (is_new && ZEND_NUM_ARGS() > 3) {
php_zmq_connect_callback(getThis(), &fci, &fci_cache, persistent_id, persistent_id_len);
}
return;
}
/* }}} */
Expand Down

0 comments on commit 9586232

Please sign in to comment.