Skip to content

Commit

Permalink
Add ability to re-attach consuming callback
Browse files Browse the repository at this point in the history
  • Loading branch information
pinepain committed Apr 24, 2015
1 parent 86db892 commit 7b5ef84
Show file tree
Hide file tree
Showing 18 changed files with 298 additions and 100 deletions.
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ install:
- sudo apt-get update
- sudo apt-get -qq --fix-missing install valgrind

- wget -qO - http://www.rabbitmq.com/rabbitmq-signing-key-public.asc | sudo apt-key add -
- sudo add-apt-repository 'deb http://www.rabbitmq.com/debian/ testing main'
- sudo apt-get update
- sudo apt-get install --only-upgrade -y rabbitmq-server
- sudo service rabbitmq-server restart

env:
global:
# The next declaration is the encrypted COVERITY_SCAN_TOKEN, created
Expand Down
8 changes: 7 additions & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
# please see the online documentation at vagrantup.com.

# Every Vagrant virtual environment requires a box to build off of.

#config.vm.box = "trusty64"
config.vm.box = "utopic64"
#config.vm.box_url = "https://cloud-images.ubuntu.com/vagrant/trusty/current/trusty-server-cloudimg-amd64-vagrant-disk1.box"

config.vm.box = "utopic64"
config.vm.box_url = "https://cloud-images.ubuntu.com/vagrant/utopic/current/utopic-server-cloudimg-amd64-vagrant-disk1.box"

#config.vm.box = "vivid64"
#config.vm.box_url = "https://cloud-images.ubuntu.com/vagrant/vivid/current/vivid-server-cloudimg-amd64-juju-vagrant-disk1.box"


# Disable automatic box update checking. If you disable this, then
# boxes will only be checked for updates when the user runs
# `vagrant box outdated`. This is not recommended.
Expand Down
8 changes: 7 additions & 1 deletion amqp.c
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_queue_class_getConnection, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_queue_class_getConsumerTag, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 0)
ZEND_END_ARG_INFO()

/* amqp_exchange ARG_INFO definition */
ZEND_BEGIN_ARG_INFO_EX(arginfo_amqp_exchange_class__construct, ZEND_SEND_BY_VAL, ZEND_RETURN_VALUE, 1)
ZEND_ARG_INFO(0, amqp_channel)
Expand Down Expand Up @@ -541,6 +544,7 @@ zend_function_entry amqp_queue_class_functions[] = {

PHP_ME(amqp_queue_class, getChannel, arginfo_amqp_queue_class_getChannel, ZEND_ACC_PUBLIC)
PHP_ME(amqp_queue_class, getConnection, arginfo_amqp_queue_class_getConnection, ZEND_ACC_PUBLIC)
PHP_ME(amqp_queue_class, getConsumerTag, arginfo_amqp_queue_class_getConsumerTag, ZEND_ACC_PUBLIC)

PHP_MALIAS(amqp_queue_class, declare, declareQueue, arginfo_amqp_queue_class_declareQueue, ZEND_ACC_PUBLIC | ZEND_ACC_DEPRECATED)

Expand Down Expand Up @@ -947,6 +951,7 @@ PHP_MINIT_FUNCTION(amqp)
REGISTER_INI_ENTRIES();

REGISTER_LONG_CONSTANT("AMQP_NOPARAM", AMQP_NOPARAM, CONST_CS | CONST_PERSISTENT);
REGISTER_LONG_CONSTANT("AMQP_JUST_CONSUME", AMQP_JUST_CONSUME, CONST_CS | CONST_PERSISTENT);
REGISTER_LONG_CONSTANT("AMQP_DURABLE", AMQP_DURABLE, CONST_CS | CONST_PERSISTENT);
REGISTER_LONG_CONSTANT("AMQP_PASSIVE", AMQP_PASSIVE, CONST_CS | CONST_PERSISTENT);
REGISTER_LONG_CONSTANT("AMQP_EXCLUSIVE", AMQP_EXCLUSIVE, CONST_CS | CONST_PERSISTENT);
Expand Down Expand Up @@ -994,7 +999,8 @@ PHP_MINFO_FUNCTION(amqp)
php_info_print_table_header(2, "Revision", PHP_AMQP_REVISION);
php_info_print_table_header(2, "Compiled", __DATE__ " @ " __TIME__);
php_info_print_table_header(2, "AMQP protocol version", "0-9-1");
php_info_print_table_header(2, "librabbitmq version", amqp_version());
php_info_print_table_header(2, "Compiled librabbitmq version", AMQP_VERSION_STRING);
php_info_print_table_header(2, "Linked librabbitmq version", amqp_version());
php_info_print_table_header(2, "Max channels per connection", PHP_AMQP_STRINGIFY(PHP_AMQP_MAX_CHANNELS));
DISPLAY_INI_ENTRIES();
}
Expand Down
98 changes: 68 additions & 30 deletions amqp_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ HashTable *amqp_queue_object_get_debug_info(zval *object, int *is_temp TSRMLS_DC
zend_hash_add(debug_info, "queue_name", sizeof("queue_name"), &value, sizeof(zval *), NULL);

MAKE_STD_ZVAL(value);
ZVAL_STRINGL(value, queue->consumer_tag, strlen(queue->consumer_tag), 1);
if (queue->consumer_tag_len > 0) {
ZVAL_STRINGL(value, queue->consumer_tag, strlen(queue->consumer_tag), 1);
} else {
ZVAL_NULL(value);
}

zend_hash_add(debug_info, "consumer_tag", sizeof("consumer_tag"), &value, sizeof(zval *), NULL);

MAKE_STD_ZVAL(value);
Expand Down Expand Up @@ -821,59 +826,62 @@ PHP_METHOD(amqp_queue_class, consume)
amqp_table_t *arguments;

char *consumer_tag;
int consumer_tag_len = 0;
int consumer_tag_len = 0;

long flags = INI_INT("amqp.auto_ack") ? AMQP_AUTOACK : AMQP_NOPARAM;

int call_result;

if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O|f!ls", &id, amqp_queue_class_entry, &fci, &fci_cache, &flags, &consumer_tag, &consumer_tag_len) == FAILURE) {

if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O|f!ls", &id, amqp_queue_class_entry, &fci,
&fci_cache, &flags, &consumer_tag, &consumer_tag_len) == FAILURE) {
return;
}

/* Pull the queue out */
queue = (amqp_queue_object *)zend_object_store_get_object(id TSRMLS_CC);
queue = (amqp_queue_object *) zend_object_store_get_object(id TSRMLS_CC);

channel = AMQP_GET_CHANNEL(queue);
AMQP_VERIFY_CHANNEL(channel, "Could not get channel.");

connection = AMQP_GET_CONNECTION(channel);
AMQP_VERIFY_CONNECTION(connection, "Could not get connection.");

/* Setup the consume */
arguments = convert_zval_to_amqp_table(queue->arguments TSRMLS_CC);
if (!(AMQP_JUST_CONSUME & flags)) {
/* Setup the consume */
arguments = convert_zval_to_amqp_table(queue->arguments TSRMLS_CC);

amqp_basic_consume_ok_t *r = amqp_basic_consume(
connection->connection_resource->connection_state,
channel->channel_id,
amqp_cstring_bytes(queue->name),
(consumer_tag_len > 0 ? amqp_cstring_bytes(consumer_tag) : amqp_empty_bytes), /* Consumer tag */
(AMQP_NOLOCAL & flags) ? 1 : 0, /* No local */
(AMQP_AUTOACK & flags) ? 1 : 0, /* no_ack, aka AUTOACK */
IS_EXCLUSIVE(queue->flags),
*arguments
);

amqp_basic_consume_ok_t * r = amqp_basic_consume(
connection->connection_resource->connection_state,
channel->channel_id,
amqp_cstring_bytes(queue->name),
(consumer_tag_len > 0 ? amqp_cstring_bytes(consumer_tag) : amqp_empty_bytes), /* Consumer tag */
(AMQP_NOLOCAL & flags) ? 1 : 0, /* No local */
(AMQP_AUTOACK & flags) ? 1 : 0, /* no_ack, aka AUTOACK */
IS_EXCLUSIVE(queue->flags),
*arguments
);
php_amqp_free_amqp_table(arguments);

php_amqp_free_amqp_table(arguments);
if (!r) {
amqp_rpc_reply_t res = amqp_get_rpc_reply(connection->connection_resource->connection_state);

if (!r) {
amqp_rpc_reply_t res = amqp_get_rpc_reply(connection->connection_resource->connection_state);
PHP_AMQP_INIT_ERROR_MESSAGE();

PHP_AMQP_INIT_ERROR_MESSAGE();
php_amqp_error(res, PHP_AMQP_ERROR_MESSAGE_PTR, connection, channel TSRMLS_CC);

php_amqp_error(res, PHP_AMQP_ERROR_MESSAGE_PTR, connection, channel TSRMLS_CC);
zend_throw_exception(amqp_queue_exception_class_entry, PHP_AMQP_ERROR_MESSAGE, 0 TSRMLS_CC);
php_amqp_maybe_release_buffers_on_channel(connection, channel);

zend_throw_exception(amqp_queue_exception_class_entry, PHP_AMQP_ERROR_MESSAGE, 0 TSRMLS_CC);
php_amqp_maybe_release_buffers_on_channel(connection, channel);
PHP_AMQP_DESTROY_ERROR_MESSAGE();
return;
}

PHP_AMQP_DESTROY_ERROR_MESSAGE();
return;
/* Set the consumer tag name, in case it is an autogenerated consumer tag name */
AMQP_SET_STR_PROPERTY(queue->consumer_tag, r->consumer_tag.bytes, r->consumer_tag.len);
queue->consumer_tag_len = r->consumer_tag.len;
}

/* Set the consumer tag name, in case it is an autogenerated consumer tag name */
AMQP_SET_STR_PROPERTY(queue->consumer_tag, r->consumer_tag.bytes, r->consumer_tag.len);
queue->consumer_tag_len = r->consumer_tag.len;

if (!ZEND_FCI_INITIALIZED(fci)) {
/* Callback not set, we have nothing to do - real consuming may happens later */
return;
Expand Down Expand Up @@ -1216,6 +1224,10 @@ PHP_METHOD(amqp_queue_class, cancel)
connection = AMQP_GET_CONNECTION(channel);
AMQP_VERIFY_CONNECTION(connection, "Could not cancel queue.");

if (!consumer_tag_len && !queue->consumer_tag_len) {
return;
}

amqp_basic_cancel_ok_t *r = amqp_basic_cancel(
connection->connection_resource->connection_state,
channel->channel_id,
Expand All @@ -1236,6 +1248,11 @@ PHP_METHOD(amqp_queue_class, cancel)
return;
}

if (!consumer_tag_len || strcmp(consumer_tag, queue->consumer_tag) != 0) {
memset(queue->consumer_tag, 0, sizeof(queue->consumer_tag));
queue->consumer_tag_len = 0;
}

php_amqp_maybe_release_buffers_on_channel(connection, channel);

RETURN_TRUE;
Expand Down Expand Up @@ -1402,6 +1419,27 @@ PHP_METHOD(amqp_queue_class, getConnection)
}
/* }}} */

/* {{{ proto string AMQPChannel::getConsumerTag()
Get latest consumer tag*/
PHP_METHOD(amqp_queue_class, getConsumerTag)
{
amqp_queue_object *queue;

if (zend_parse_parameters_none() == FAILURE) {
return;
}

queue = (amqp_queue_object *)zend_object_store_get_object(getThis() TSRMLS_CC);

if (queue->consumer_tag_len > 0) {
RETURN_STRINGL(queue->consumer_tag, strlen(queue->consumer_tag), 1);
}

RETURN_NULL();
}
/* }}} */


/*
*Local variables:
*tab-width: 4
Expand Down
1 change: 1 addition & 0 deletions amqp_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ PHP_METHOD(amqp_queue_class, delete);

PHP_METHOD(amqp_queue_class, getChannel);
PHP_METHOD(amqp_queue_class, getConnection);
PHP_METHOD(amqp_queue_class, getConsumerTag);

/*
*Local variables:
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
}
],
"require": {
"php": ">=5.2.0"
"php": ">=5.3.0"
}
}
4 changes: 2 additions & 2 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ if test "$PHP_AMQP" != "no"; then
type git &>/dev/null

if test $? -eq 0 ; then
git describe --tags &>/dev/null
git describe --abbrev=0 --tags &>/dev/null

if test $? -eq 0 ; then
AC_DEFINE_UNQUOTED([PHP_AMQP_VERSION], ["`git describe --abbr=0`-`git rev-parse --abbrev-ref HEAD`-dev"], [git version])
AC_DEFINE_UNQUOTED([PHP_AMQP_VERSION], ["`git describe --abbrev=0 --tags`-`git rev-parse --abbrev-ref HEAD`-dev"], [git version])
fi

git rev-parse --short HEAD &>/dev/null
Expand Down
4 changes: 4 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ http://pear.php.net/dtd/package-2.0.xsd">
</stability>
<license uri="http://www.php.net/license">PHP License</license>
<notes>
X.Y.Z Release:
* Add ability to re-attach consuming callback (Bogdan Padalko)
* Add AMQPQueue::getConsumerTag() method and fix consumer tag handling in AMQPQueue class methods (Bogdan Padalko)

1.6.0beta3 Release:
* Add basic.recover AMQP method support (see AMQPChannel::basicRecover() method) (Bogdan Padalko)
* Fix building on OS X (Bogdan Padalko)
Expand Down
1 change: 1 addition & 0 deletions php_amqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ extern zend_module_entry amqp_module_entry;

#define AMQP_NOPARAM 0
/* Where is 1?*/
#define AMQP_JUST_CONSUME 1
#define AMQP_DURABLE 2
#define AMQP_PASSIVE 4
#define AMQP_EXCLUSIVE 8
Expand Down
54 changes: 4 additions & 50 deletions provision/nginx/default
Original file line number Diff line number Diff line change
Expand Up @@ -52,61 +52,15 @@ server {
# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
#
location ~ \.php$ {
fastcgi_split_path_info ^(.+\.php)(/.+)$;
include fastcgi_params;
#fastcgi_split_path_info ^(.+\.php)(/.+)$;
# NOTE: You should have "cgi.fix_pathinfo = 0;" in php.ini

# With php5-cgi alone:
fastcgi_pass 127.0.0.1:9000;
# With php5-fpm:
#fastcgi_pass unix:/var/run/php5-fpm.sock;
fastcgi_index index.php;
include fastcgi_params;
fastcgi_param SCRIPT_FILENAME $document_root/$fastcgi_script_name;
}

# deny access to .htaccess files, if Apache's document root
# concurs with nginx's one
#
#location ~ /\.ht {
# deny all;
#}
}


# another virtual host using mix of IP-, name-, and port-based configuration
#
#server {
# listen 8000;
# listen somename:8080;
# server_name somename alias another.alias;
# root html;
# index index.html index.htm;
#
# location / {
# try_files $uri $uri/ =404;
# }
#}


# HTTPS server
#
#server {
# listen 443;
# server_name localhost;
#
# root html;
# index index.html index.htm;
#
# ssl on;
# ssl_certificate cert.pem;
# ssl_certificate_key cert.key;
#
# ssl_session_timeout 5m;
#
# ssl_protocols SSLv3 TLSv1 TLSv1.1 TLSv1.2;
# ssl_ciphers "HIGH:!aNULL:!MD5 or HIGH:!aNULL:!MD5:!3DES";
# ssl_prefer_server_ciphers on;
#
# location / {
# try_files $uri $uri/ =404;
# }
#}
}
Loading

0 comments on commit 7b5ef84

Please sign in to comment.