Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 0 additions & 61 deletions configuration.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs) /* {{{ */
cbs->dr_msg = NULL;
kafka_conf_callback_dtor(cbs->stats);
cbs->stats = NULL;
kafka_conf_callback_dtor(cbs->consume);
cbs->consume = NULL;
kafka_conf_callback_dtor(cbs->offset_commit);
cbs->offset_commit = NULL;
kafka_conf_callback_dtor(cbs->log);
Expand All @@ -54,7 +52,6 @@ void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *f
kafka_conf_callback_copy(&to->rebalance, from->rebalance);
kafka_conf_callback_copy(&to->dr_msg, from->dr_msg);
kafka_conf_callback_copy(&to->stats, from->stats);
kafka_conf_callback_copy(&to->consume, from->consume);
kafka_conf_callback_copy(&to->offset_commit, from->offset_commit);
kafka_conf_callback_copy(&to->log, from->log);
} /* }}} */
Expand Down Expand Up @@ -218,32 +215,6 @@ static void kafka_conf_rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_
zval_ptr_dtor(&args[2]);
}

static void kafka_conf_consume_cb(rd_kafka_message_t *msg, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zval args[2];

if (!opaque) {
return;
}

if (!cbs->consume) {
return;
}

ZVAL_NULL(&args[0]);
ZVAL_NULL(&args[1]);

kafka_message_new(&args[0], msg);
ZVAL_ZVAL(&args[1], &cbs->zrk, 1, 0);


kafka_call_function(&cbs->consume->fci, &cbs->consume->fcc, NULL, 2, args);

zval_ptr_dtor(&args[0]);
zval_ptr_dtor(&args[1]);
}

static void kafka_conf_offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
Expand Down Expand Up @@ -509,38 +480,6 @@ ZEND_METHOD(Kafka_Configuration, setRebalanceCb)
}
/* }}} */

/* {{{ proto void Kafka\Configuration::setConsumeCb(callable $callback)
Set consume callback to use with poll */
ZEND_METHOD(Kafka_Configuration, setConsumeCb)
{
zend_fcall_info fci;
zend_fcall_info_cache fcc;
kafka_conf_object *intern;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
Z_PARAM_FUNC(fci, fcc)
ZEND_PARSE_PARAMETERS_END();

intern = get_kafka_conf_object(getThis());
if (!intern) {
return;
}

Z_ADDREF_P(&fci.function_name);

if (intern->cbs.consume) {
zval_ptr_dtor(&intern->cbs.consume->fci.function_name);
} else {
intern->cbs.consume = ecalloc(1, sizeof(*intern->cbs.consume));
}

intern->cbs.consume->fci = fci;
intern->cbs.consume->fcc = fcc;

rd_kafka_conf_set_consume_cb(intern->conf, kafka_conf_consume_cb);
}
/* }}} */

/* {{{ proto void Kafka\Configuration::setOffsetCommitCb(callback $callback)
Set offset commit callback for use with consumer groups */
ZEND_METHOD(Kafka_Configuration, setOffsetCommitCb)
Expand Down
2 changes: 0 additions & 2 deletions configuration.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ public function setStatsCb(callable $callback): void {}

public function setRebalanceCb(callable $callback): void {}

public function setConsumeCb(callable $callback): void {}

public function setOffsetCommitCb(callable $callback): void {}

public function setLogCb(callable $callback): void {}
Expand Down
6 changes: 1 addition & 5 deletions configuration_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 8077a6ec354f5333de7ed2e808e970a93f1a5c89 */
* Stub hash: 2a9fd8de9f13ef85f4c02373cc6f2dc87114edd0 */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Kafka_Configuration___construct, 0, 0, 0)
ZEND_END_ARG_INFO()
Expand All @@ -22,8 +22,6 @@ ZEND_END_ARG_INFO()

#define arginfo_class_Kafka_Configuration_setRebalanceCb arginfo_class_Kafka_Configuration_setErrorCb

#define arginfo_class_Kafka_Configuration_setConsumeCb arginfo_class_Kafka_Configuration_setErrorCb

#define arginfo_class_Kafka_Configuration_setOffsetCommitCb arginfo_class_Kafka_Configuration_setErrorCb

#define arginfo_class_Kafka_Configuration_setLogCb arginfo_class_Kafka_Configuration_setErrorCb
Expand All @@ -36,7 +34,6 @@ ZEND_METHOD(Kafka_Configuration, setErrorCb);
ZEND_METHOD(Kafka_Configuration, setDrMsgCb);
ZEND_METHOD(Kafka_Configuration, setStatsCb);
ZEND_METHOD(Kafka_Configuration, setRebalanceCb);
ZEND_METHOD(Kafka_Configuration, setConsumeCb);
ZEND_METHOD(Kafka_Configuration, setOffsetCommitCb);
ZEND_METHOD(Kafka_Configuration, setLogCb);

Expand All @@ -49,7 +46,6 @@ static const zend_function_entry class_Kafka_Configuration_methods[] = {
ZEND_ME(Kafka_Configuration, setDrMsgCb, arginfo_class_Kafka_Configuration_setDrMsgCb, ZEND_ACC_PUBLIC)
ZEND_ME(Kafka_Configuration, setStatsCb, arginfo_class_Kafka_Configuration_setStatsCb, ZEND_ACC_PUBLIC)
ZEND_ME(Kafka_Configuration, setRebalanceCb, arginfo_class_Kafka_Configuration_setRebalanceCb, ZEND_ACC_PUBLIC)
ZEND_ME(Kafka_Configuration, setConsumeCb, arginfo_class_Kafka_Configuration_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(Kafka_Configuration, setOffsetCommitCb, arginfo_class_Kafka_Configuration_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(Kafka_Configuration, setLogCb, arginfo_class_Kafka_Configuration_setLogCb, ZEND_ACC_PUBLIC)
ZEND_FE_END
Expand Down
7 changes: 0 additions & 7 deletions tests/conf_callbacks.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ Kafka\Configuration

$conf = new Kafka\Configuration();

echo "Setting consume callback\n";
$conf->setConsumeCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["consume_cb"]));

echo "Setting offset_commit callback\n";
$conf->setOffsetCommitCb(function () { });
$dump = $conf->dump();
Expand All @@ -23,8 +18,6 @@ var_dump(isset($dump["rebalance_cb"]));


--EXPECT--
Setting consume callback
bool(true)
Setting offset_commit callback
bool(true)
Setting rebalance callback
Expand Down