Skip to content

Commit

Permalink
Expose offsets_for_times consumer method. closes confluentinc#224
Browse files Browse the repository at this point in the history
Expose offsets_for_times consumer method. closes confluentinc#224
  • Loading branch information
johnistan committed Oct 25, 2017
1 parent 7ebc807 commit fe3cd06
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 0 deletions.
59 changes: 59 additions & 0 deletions confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,47 @@ static PyObject *Consumer_get_watermark_offsets (Handle *self, PyObject *args,
}


static PyObject *Consumer_offsets_for_times (Handle *self, PyObject *args,
PyObject *kwargs) {

PyObject *plist;
double tmout = -1.0f;
rd_kafka_topic_partition_list_t *c_parts;
rd_kafka_resp_err_t err;
static char *kws[] = { "partitions", "timeout", NULL };

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kws,
&plist, &tmout))
return NULL;

if (!(c_parts = py_to_c_parts(plist)))
return NULL;

err = rd_kafka_offsets_for_times(self->rk,
c_parts,
tmout >= 0 ? (int)(tmout * 1000.0f) : -1);

if (err) {
rd_kafka_topic_partition_list_destroy(c_parts);
cfl_PyErr_Format(err,
"Failed to get offsets: %s",
rd_kafka_err2str(err));
return NULL;
}

plist = c_parts_to_py(c_parts);
rd_kafka_topic_partition_list_destroy(c_parts);

return plist;

}


static PyObject *Consumer_poll (Handle *self, PyObject *args,
PyObject *kwargs) {
Expand Down Expand Up @@ -936,6 +977,24 @@ static PyMethodDef Consumer_methods[] = {
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "offsets_for_times", (PyCFunction)Consumer_offsets_for_times,
METH_VARARGS|METH_KEYWORDS,
".. py:function:: offsets_for_times(partitions, [timeout=None])\n"
"\n"
" offsets_for_times looks up offsets by timestamp for the given partitions.\n"
"\n"
" The returned offsets for each partition is the earliest offset whose\n"
" timestamp is greater than or equal to the given timestamp in the\n"
" corresponding partition.\n"
"\n"
" :param list(TopicPartition) partitions: Topic+partition to return offsets for."
" :param float timeout: Request timeout (when cached=False).\n"
" :returns: list of topic+partition with offset field set and possibly error set\n"
" :rtype: list(TopicPartition)\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "close", (PyCFunction)Consumer_close, METH_NOARGS,
"\n"
" Close down and terminate the Kafka Consumer.\n"
Expand Down
13 changes: 13 additions & 0 deletions tests/test_Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ def dummy_assign_revoke(consumer, partitions):
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._WAIT_COORD, KafkaError.LEADER_NOT_AVAILABLE),\
str(e.args([0]))

# Query broker for timestamps for partition
try:
test_topic_partition = TopicPartition("test", 0, 100)
offsets = kc.offsets_for_times([test_topic_partition], timeout=0.1)
except KafkaException as e:
import ipdb; ipdb.set_trace()
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._WAIT_COORD, KafkaError.LEADER_NOT_AVAILABLE),\
str(e.args([0]))

kc.unassign()

kc.commit(async=True)
Expand Down Expand Up @@ -232,6 +241,10 @@ def test_any_method_after_close_throws_exception():
lo, hi = c.get_watermark_offsets(TopicPartition("test", 0))
assert 'Consumer closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.offsets_for_times([TopicPartition("test", 0)])
assert 'Consumer closed' == str(ex.value)


@pytest.mark.skipif(libversion()[1] < 0x000b0000,
reason="requires librdkafka >=0.11.0")
Expand Down

0 comments on commit fe3cd06

Please sign in to comment.