/
kafka.rs
160 lines (145 loc) · 4.93 KB
/
kafka.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
use super::Direction;
use crate::codec::{CodecBuilder, CodecReadError};
use crate::frame::MessageType;
use crate::message::{Encodable, Message, Messages, ProtocolType};
use anyhow::{anyhow, Result};
use bytes::{Buf, BytesMut};
use kafka_protocol::messages::ApiKey;
use std::sync::mpsc;
use tokio_util::codec::{Decoder, Encoder};
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct RequestHeader {
pub api_key: ApiKey,
pub version: i16,
}
#[derive(Clone)]
pub struct KafkaCodecBuilder {
direction: Direction,
}
// Depending on if the codec is used in a sink or a source requires different processing logic:
// * Sources parse requests which do not require any special handling
// * Sinks parse responses which requires first matching up the version and api_key with its corresponding request
// + To achieve this Sinks use an mpsc channel to send header data from the encoder to the decoder
impl CodecBuilder for KafkaCodecBuilder {
type Decoder = KafkaDecoder;
type Encoder = KafkaEncoder;
fn new(direction: Direction) -> Self {
Self { direction }
}
fn build(&self) -> (KafkaDecoder, KafkaEncoder) {
let (tx, rx) = match self.direction {
Direction::Source => (None, None),
Direction::Sink => {
let (tx, rx) = mpsc::channel();
(Some(tx), Some(rx))
}
};
(
KafkaDecoder::new(rx, self.direction),
KafkaEncoder::new(tx, self.direction),
)
}
}
pub struct KafkaDecoder {
request_header_rx: Option<mpsc::Receiver<RequestHeader>>,
messages: Messages,
direction: Direction,
}
impl KafkaDecoder {
pub fn new(
request_header_rx: Option<mpsc::Receiver<RequestHeader>>,
direction: Direction,
) -> Self {
KafkaDecoder {
request_header_rx,
messages: vec![],
direction,
}
}
}
fn get_length_of_full_message(src: &mut BytesMut) -> Option<usize> {
if src.len() > 4 {
let size = u32::from_be_bytes(src[0..4].try_into().unwrap()) as usize + 4;
if size >= src.len() {
Some(size)
} else {
None
}
} else {
None
}
}
impl Decoder for KafkaDecoder {
type Item = Messages;
type Error = CodecReadError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, CodecReadError> {
loop {
if let Some(size) = get_length_of_full_message(src) {
let bytes = src.split_to(size);
tracing::debug!(
"{}: incoming kafka message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);
let request_header = if let Some(rx) = self.request_header_rx.as_ref() {
Some(rx.recv().map_err(|_| {
CodecReadError::Parser(anyhow!("kafka encoder half was lost"))
})?)
} else {
None
};
self.messages.push(Message::from_bytes(
bytes.freeze(),
ProtocolType::Kafka { request_header },
));
} else if self.messages.is_empty() || src.remaining() != 0 {
return Ok(None);
} else {
return Ok(Some(std::mem::take(&mut self.messages)));
}
}
}
}
pub struct KafkaEncoder {
request_header_tx: Option<mpsc::Sender<RequestHeader>>,
direction: Direction,
}
impl KafkaEncoder {
pub fn new(
request_header_tx: Option<mpsc::Sender<RequestHeader>>,
direction: Direction,
) -> Self {
KafkaEncoder {
request_header_tx,
direction,
}
}
}
impl Encoder<Messages> for KafkaEncoder {
type Error = anyhow::Error;
fn encode(&mut self, item: Messages, dst: &mut BytesMut) -> Result<()> {
item.into_iter().try_for_each(|m| {
let start = dst.len();
let result = match m.into_encodable(MessageType::Kafka)? {
Encodable::Bytes(bytes) => {
dst.extend_from_slice(&bytes);
Ok(())
}
Encodable::Frame(frame) => frame.into_kafka().unwrap().encode(dst),
};
if let Some(tx) = self.request_header_tx.as_ref() {
let api_key = i16::from_be_bytes(dst[start + 4..start + 6].try_into().unwrap());
let version = i16::from_be_bytes(dst[start + 6..start + 8].try_into().unwrap());
let api_key =
ApiKey::try_from(api_key).map_err(|_| anyhow!("unknown api key {api_key}"))?;
tx.send(RequestHeader { api_key, version })?;
}
tracing::debug!(
"{}: outgoing kafka message:\n{}",
self.direction,
pretty_hex::pretty_hex(&&dst[start..])
);
result
})
}
}