forked from dpkp/kafka-python
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathlegacy.py
474 lines (394 loc) · 15.6 KB
/
legacy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
from __future__ import absolute_import
import logging
import struct
from kafka.vendor import six # pylint: disable=import-error
import kafka.protocol.commit
import kafka.protocol.fetch
import kafka.protocol.message
import kafka.protocol.metadata
import kafka.protocol.offset
import kafka.protocol.produce
import kafka.structs
from kafka.codec import gzip_encode, snappy_encode
from kafka.errors import ProtocolError, UnsupportedCodecError
from kafka.util import (
crc32, read_short_string, relative_unpack,
write_int_string, group_by_topic_and_partition)
from kafka.protocol.message import MessageSet
log = logging.getLogger(__name__)
ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)
class KafkaProtocol(object):
"""
Class to encapsulate all of the protocol encoding/decoding.
This class does not have any state associated with it, it is purely
for organization.
"""
PRODUCE_KEY = 0
FETCH_KEY = 1
OFFSET_KEY = 2
METADATA_KEY = 3
OFFSET_COMMIT_KEY = 8
OFFSET_FETCH_KEY = 9
CONSUMER_METADATA_KEY = 10
###################
# Private API #
###################
@classmethod
def _encode_message_header(cls, client_id, correlation_id, request_key,
version=0):
"""
Encode the common request envelope
"""
return struct.pack('>hhih%ds' % len(client_id),
request_key, # ApiKey
version, # ApiVersion
correlation_id, # CorrelationId
len(client_id), # ClientId size
client_id) # ClientId
@classmethod
def _encode_message_set(cls, messages):
"""
Encode a MessageSet. Unlike other arrays in the protocol,
MessageSets are not length-prefixed
Format
======
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
"""
message_set = []
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0,
len(encoded_message),
encoded_message))
return b''.join(message_set)
@classmethod
def _encode_message(cls, message):
"""
Encode a single message.
The magic number of a message is a format version number.
The only supported magic number right now is zero
Format
======
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
"""
if message.magic == 0:
msg = b''.join([
struct.pack('>BB', message.magic, message.attributes),
write_int_string(message.key),
write_int_string(message.value)
])
crc = crc32(msg)
msg = struct.pack('>i%ds' % len(msg), crc, msg)
else:
raise ProtocolError("Unexpected magic number: %d" % message.magic)
return msg
##################
# Public API #
##################
@classmethod
def encode_produce_request(cls, payloads=(), acks=1, timeout=1000):
"""
Encode a ProduceRequest struct
Arguments:
payloads: list of ProduceRequestPayload
acks: How "acky" you want the request to be
1: written to disk by the leader
0: immediate response
-1: waits for all replicas to be in sync
timeout: Maximum time (in ms) the server will wait for replica acks.
This is _not_ a socket timeout
Returns: ProduceRequest
"""
if acks not in (1, 0, -1):
raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
topics = []
for topic, topic_payloads in group_by_topic_and_partition(payloads).items():
topic_msgs = []
for partition, payload in topic_payloads.items():
partition_msgs = []
for msg in payload.messages:
m = kafka.protocol.message.Message(
msg.value, key=msg.key,
magic=msg.magic, attributes=msg.attributes
)
partition_msgs.append((0, m.encode()))
topic_msgs.append((partition, MessageSet.encode(partition_msgs, prepend_size=False)))
topics.append((topic, topic_msgs))
return kafka.protocol.produce.ProduceRequest[0](
required_acks=acks,
timeout=timeout,
topics=topics
)
@classmethod
def decode_produce_response(cls, response):
"""
Decode ProduceResponse to ProduceResponsePayload
Arguments:
response: ProduceResponse
Return: list of ProduceResponsePayload
"""
return [
kafka.structs.ProduceResponsePayload(topic, partition, error, offset)
for topic, partitions in response.topics
for partition, error, offset in partitions
]
@classmethod
def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096):
"""
Encodes a FetchRequest struct
Arguments:
payloads: list of FetchRequestPayload
max_wait_time (int, optional): ms to block waiting for min_bytes
data. Defaults to 100.
min_bytes (int, optional): minimum bytes required to return before
max_wait_time. Defaults to 4096.
Return: FetchRequest
"""
return kafka.protocol.fetch.FetchRequest[0](
replica_id=-1,
max_wait_time=max_wait_time,
min_bytes=min_bytes,
topics=[(
topic,
[(
partition,
payload.offset,
payload.max_bytes)
for partition, payload in topic_payloads.items()])
for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
@classmethod
def decode_fetch_response(cls, response):
"""
Decode FetchResponse struct to FetchResponsePayloads
Arguments:
response: FetchResponse
"""
return [
kafka.structs.FetchResponsePayload(
topic, partition, error, highwater_offset, [
offset_and_msg
for offset_and_msg in cls.decode_message_set(messages)])
for topic, partitions in response.topics
for partition, error, highwater_offset, messages in partitions
]
@classmethod
def decode_message_set(cls, raw_data):
messages = MessageSet.decode(raw_data, bytes_to_read=len(raw_data))
for offset, _, message in messages:
if isinstance(message, kafka.protocol.message.Message) and message.is_compressed():
inner_messages = message.decompress()
for (inner_offset, _msg_size, inner_msg) in inner_messages:
yield kafka.structs.OffsetAndMessage(inner_offset, inner_msg)
else:
yield kafka.structs.OffsetAndMessage(offset, message)
@classmethod
def encode_offset_request(cls, payloads=()):
return kafka.protocol.offset.OffsetRequest[0](
replica_id=-1,
topics=[(
topic,
[(
partition,
payload.time,
payload.max_offsets)
for partition, payload in six.iteritems(topic_payloads)])
for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
@classmethod
def decode_offset_response(cls, response):
"""
Decode OffsetResponse into OffsetResponsePayloads
Arguments:
response: OffsetResponse
Returns: list of OffsetResponsePayloads
"""
return [
kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets))
for topic, partitions in response.topics
for partition, error, offsets in partitions
]
@classmethod
def encode_list_offset_request(cls, payloads=()):
return kafka.protocol.offset.OffsetRequest[1](
replica_id=-1,
topics=[(
topic,
[(
partition,
payload.time)
for partition, payload in six.iteritems(topic_payloads)])
for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
@classmethod
def decode_list_offset_response(cls, response):
"""
Decode OffsetResponse_v2 into ListOffsetResponsePayloads
Arguments:
response: OffsetResponse_v2
Returns: list of ListOffsetResponsePayloads
"""
return [
kafka.structs.ListOffsetResponsePayload(topic, partition, error, timestamp, offset)
for topic, partitions in response.topics
for partition, error, timestamp, offset in partitions
]
@classmethod
def encode_metadata_request(cls, topics=(), payloads=None):
"""
Encode a MetadataRequest
Arguments:
topics: list of strings
"""
if payloads is not None:
topics = payloads
return kafka.protocol.metadata.MetadataRequest[0](topics)
@classmethod
def decode_metadata_response(cls, response):
return response
@classmethod
def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
"""
Encode a ConsumerMetadataRequest
Arguments:
client_id: string
correlation_id: int
payloads: string (consumer group)
"""
message = []
message.append(cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.CONSUMER_METADATA_KEY))
message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))
msg = b''.join(message)
return write_int_string(msg)
@classmethod
def decode_consumer_metadata_response(cls, data):
"""
Decode bytes to a kafka.structs.ConsumerMetadataResponse
Arguments:
data: bytes to decode
"""
((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
return kafka.structs.ConsumerMetadataResponse(error, nodeId, host, port)
@classmethod
def encode_offset_commit_request(cls, group, payloads):
"""
Encode an OffsetCommitRequest struct
Arguments:
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequestPayload
"""
return kafka.protocol.commit.OffsetCommitRequest[0](
consumer_group=group,
topics=[(
topic,
[(
partition,
payload.offset,
payload.metadata)
for partition, payload in six.iteritems(topic_payloads)])
for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
@classmethod
def decode_offset_commit_response(cls, response):
"""
Decode OffsetCommitResponse to an OffsetCommitResponsePayload
Arguments:
response: OffsetCommitResponse
"""
return [
kafka.structs.OffsetCommitResponsePayload(topic, partition, error)
for topic, partitions in response.topics
for partition, error in partitions
]
@classmethod
def encode_offset_fetch_request(cls, group, payloads, from_kafka=False):
"""
Encode an OffsetFetchRequest struct. The request is encoded using
version 0 if from_kafka is false, indicating a request for Zookeeper
offsets. It is encoded using version 1 otherwise, indicating a request
for Kafka offsets.
Arguments:
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequestPayload
from_kafka: bool, default False, set True for Kafka-committed offsets
"""
version = 1 if from_kafka else 0
return kafka.protocol.commit.OffsetFetchRequest[version](
consumer_group=group,
topics=[(
topic,
list(topic_payloads.keys()))
for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
@classmethod
def decode_offset_fetch_response(cls, response):
"""
Decode OffsetFetchResponse to OffsetFetchResponsePayloads
Arguments:
response: OffsetFetchResponse
"""
return [
kafka.structs.OffsetFetchResponsePayload(
topic, partition, offset, metadata, error
)
for topic, partitions in response.topics
for partition, offset, metadata, error in partitions
]
def create_message(payload, key=None):
"""
Construct a Message
Arguments:
payload: bytes, the payload to send to Kafka
key: bytes, a key used for partition routing (optional)
"""
return kafka.structs.Message(0, 0, key, payload)
def create_gzip_message(payloads, key=None, compresslevel=None):
"""
Construct a Gzipped Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
Arguments:
payloads: list(bytes), a list of payload to send be sent to Kafka
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, pl_key) for payload, pl_key in payloads])
gzipped = gzip_encode(message_set, compresslevel=compresslevel)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
return kafka.structs.Message(0, 0x00 | codec, key, gzipped)
def create_snappy_message(payloads, key=None):
"""
Construct a Snappy Message containing multiple Messages
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
Arguments:
payloads: list(bytes), a list of payload to send be sent to Kafka
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, pl_key) for payload, pl_key in payloads])
snapped = snappy_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
return kafka.structs.Message(0, 0x00 | codec, key, snapped)
def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):
"""Create a message set using the given codec.
If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
return a list containing a single codec-encoded message.
"""
if codec == CODEC_NONE:
return [create_message(m, k) for m, k in messages]
elif codec == CODEC_GZIP:
return [create_gzip_message(messages, key, compresslevel)]
elif codec == CODEC_SNAPPY:
return [create_snappy_message(messages, key)]
else:
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)