forked from rapidsai/cudf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka.pyx
102 lines (84 loc) · 4.17 KB
/
kafka.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
from libc.stdint cimport int32_t, int64_t
from libcpp cimport bool, nullptr
from libcpp.map cimport map
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from cudf._lib.cpp.io.types cimport datasource
from cudf._lib.cpp.libcpp.memory cimport make_unique
from cudf_kafka._lib.kafka cimport kafka_consumer
# To avoid including <python.h> in libcudf_kafka
# we introduce this wrapper in Cython
cdef map[string, string] oauth_callback_wrapper(void *ctx):
resp = (<object>(ctx))()
cdef map[string, string] c_resp
c_resp[str.encode("token")] = str.encode(resp["token"])
c_resp[str.encode("token_expiration_in_epoch")] \
= str(resp["token_expiration_in_epoch"]).encode()
return c_resp
cdef class KafkaDatasource(Datasource):
def __cinit__(self,
object kafka_configs,
string topic=b"",
int32_t partition=-1,
int64_t start_offset=0,
int64_t end_offset=0,
int32_t batch_timeout=10000,
string delimiter=b"",):
cdef map[string, string] configs
cdef void* python_callable = nullptr
cdef map[string, string] (*python_callable_wrapper)(void *)
for key in kafka_configs:
if key == 'oauth_cb':
if callable(kafka_configs[key]):
python_callable = <void *>kafka_configs[key]
python_callable_wrapper = &oauth_callback_wrapper
else:
raise TypeError("'oauth_cb' configuration must \
be a Python callable object")
else:
configs[key.encode()] = kafka_configs[key].encode()
if topic != b"" and partition != -1:
self.c_datasource = <unique_ptr[datasource]> \
move(make_unique[kafka_consumer](configs,
python_callable,
python_callable_wrapper,
topic,
partition,
start_offset,
end_offset,
batch_timeout,
delimiter))
else:
self.c_datasource = <unique_ptr[datasource]> \
move(make_unique[kafka_consumer](configs,
python_callable,
python_callable_wrapper))
cdef datasource* get_datasource(self) nogil:
return <datasource *> self.c_datasource.get()
cpdef void commit_offset(self,
string topic,
int32_t partition,
int64_t offset):
(<kafka_consumer *> self.c_datasource.get()).commit_offset(
topic, partition, offset)
cpdef int64_t get_committed_offset(self,
string topic,
int32_t partition):
return (<kafka_consumer *> self.c_datasource.get()). \
get_committed_offset(topic, partition)
cpdef map[string, vector[int32_t]] list_topics(self,
string topic) except *:
return (<kafka_consumer *> self.c_datasource.get()). \
list_topics(topic)
cpdef map[string, int64_t] get_watermark_offset(self, string topic,
int32_t partition,
int32_t timeout,
bool cached):
return (<kafka_consumer *> self.c_datasource.get()). \
get_watermark_offset(topic, partition, timeout, cached)
cpdef void unsubscribe(self):
(<kafka_consumer *> self.c_datasource.get()).unsubscribe()
cpdef void close(self, int32_t timeout):
(<kafka_consumer *> self.c_datasource.get()).close(timeout)