Skip to content

Commit 047d8a8

Browse files
Add decode_request_header_from_buffer and encode_request_header_to_buffer helper methods (#131)
Co-authored-by: enzolombardi <enzo.lombardi@cleafy.com>
1 parent 6c78350 commit 047d8a8

File tree

4 files changed

+38
-27
lines changed

4 files changed

+38
-27
lines changed

src/lib.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,17 @@
1313
//! easy serialization and deserialization via [`bytes::Bytes`].
1414
//!
1515
//! ```rust
16-
//! use bytes::{Bytes, BytesMut};
16+
//! use bytes::BytesMut;
17+
//! use kafka_protocol::protocol::{encode_request_header_into_buffer, decode_request_header_from_buffer};
1718
//! use kafka_protocol::messages::RequestHeader;
1819
//! use kafka_protocol::protocol::{StrBytes, Encodable, Decodable};
1920
//!
2021
//! let mut request_header = RequestHeader::default();
2122
//! request_header.correlation_id = 1;
2223
//! request_header.client_id = Some(StrBytes::from_static_str("test-client"));
2324
//! let mut buf = BytesMut::new();
24-
//! request_header.encode(&mut buf, 2);
25-
//! assert_eq!(request_header, RequestHeader::decode(&mut buf, 2).unwrap());
25+
//! encode_request_header_into_buffer(&mut buf, &request_header).unwrap();
26+
//! assert_eq!(request_header, decode_request_header_from_buffer(&mut buf).unwrap());
2627
//! ```
2728
//! Note that every message implementation of [`Encodable::encode`](crate::protocol::Encodable::encode)
2829
//! and [`Decodable::decode`](crate::protocol::Decodable::decode) requires a version to be provided
@@ -37,7 +38,7 @@
3738
//! type to a [`bytes::Bytes`] buffer.
3839
//!
3940
//! ```rust
40-
//! use kafka_protocol::protocol::{StrBytes, Encodable, HeaderVersion};
41+
//! use kafka_protocol::protocol::{StrBytes, Encodable, HeaderVersion, encode_request_header_into_buffer};
4142
//! use bytes::{BytesMut, Bytes};
4243
//! use kafka_protocol::messages::{RequestHeader, ApiKey, ApiVersionsRequest};
4344
//! # use std::error::Error;
@@ -47,7 +48,7 @@
4748
//! req_header.request_api_version = 3;
4849
//! req_header.request_api_key = ApiKey::ApiVersions as i16;
4950
//! req_header.client_id = Some(StrBytes::from_static_str("example"));
50-
//! req_header.encode(&mut buf, ApiVersionsRequest::header_version(req_header.request_api_version)).unwrap();
51+
//! encode_request_header_into_buffer(&mut buf, &req_header).unwrap();
5152
//! let mut api_versions_req = ApiVersionsRequest::default();
5253
//! api_versions_req.client_software_version = StrBytes::from_static_str("1.0");
5354
//! api_versions_req.client_software_name = StrBytes::from_static_str("example-client");
@@ -69,7 +70,7 @@
6970
//! A simple example for decoding an unknown message encoded in `buf`:
7071
//! ```rust
7172
//! use kafka_protocol::messages::{RequestHeader, ApiVersionsRequest, ApiKey, RequestKind};
72-
//! use kafka_protocol::protocol::{Encodable, Decodable, StrBytes, HeaderVersion};
73+
//! use kafka_protocol::protocol::{Encodable, Decodable, StrBytes, HeaderVersion, decode_request_header_from_buffer, encode_request_header_into_buffer};
7374
//! use bytes::{BytesMut, Buf};
7475
//! use std::convert::TryFrom;
7576
//! use kafka_protocol::protocol::buf::ByteBuf;
@@ -78,17 +79,14 @@
7879
//! # req_header.request_api_version = 3;
7980
//! # req_header.request_api_key = ApiKey::ApiVersions as i16;
8081
//! # req_header.client_id = Some(StrBytes::from_static_str("example"));
81-
//! # req_header.encode(&mut buf, ApiVersionsRequest::header_version(req_header.request_api_version)).unwrap();
82+
//! # encode_request_header_into_buffer(&mut buf, &req_header).unwrap();
8283
//! # let mut api_versions_req = ApiVersionsRequest::default();
8384
//! # api_versions_req.client_software_version = StrBytes::from_static_str("1.0");
8485
//! # api_versions_req.client_software_name = StrBytes::from_static_str("example-client");
85-
//! # api_versions_req.encode(&mut buf, 3);
86+
//! # api_versions_req.encode(&mut buf, 3).unwrap();
8687
//!
87-
//! let api_key = buf.peek_bytes(0..2).get_i16();
88-
//! let api_version = buf.peek_bytes(2..4).get_i16();
89-
//! let header_version = ApiKey::try_from(api_key).unwrap().request_header_version(api_version);
9088
//!
91-
//! let header = RequestHeader::decode(&mut buf, header_version).unwrap();
89+
//! let header = decode_request_header_from_buffer(&mut buf).unwrap();
9290
//! let api_key = ApiKey::try_from(header.request_api_version);
9391
//! let req = match api_key {
9492
//! ApiVersionsKey => RequestKind::ApiVersions(ApiVersionsRequest::decode(&mut buf, header.request_api_version).unwrap()),

src/protocol/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ mod str_bytes {
121121

122122
pub use str_bytes::StrBytes;
123123

124+
use crate::messages::{ApiKey, RequestHeader};
125+
124126
pub(crate) trait NewType<Inner>: From<Inner> + Into<Inner> + Borrow<Inner> {}
125127

126128
impl<T> NewType<T> for T {}
@@ -209,6 +211,26 @@ pub trait Request: Message + Encodable + Decodable + HeaderVersion {
209211
type Response: Message + Encodable + Decodable + HeaderVersion;
210212
}
211213

214+
/// Decode the request header from the provided buffer.
215+
pub fn decode_request_header_from_buffer<B: ByteBuf>(buf: &mut B) -> Result<RequestHeader> {
216+
let api_key = ApiKey::try_from(bytes::Buf::get_i16(&mut buf.peek_bytes(0..2)))
217+
.map_err(|_| anyhow::Error::msg("Unknown API key"))?;
218+
let api_version = bytes::Buf::get_i16(&mut buf.peek_bytes(2..4));
219+
let header_version = api_key.request_header_version(api_version);
220+
RequestHeader::decode(buf, header_version)
221+
}
222+
223+
/// Encode the request header into the provided buffer.
224+
pub fn encode_request_header_into_buffer<B: ByteBufMut>(
225+
buf: &mut B,
226+
header: &RequestHeader,
227+
) -> Result<()> {
228+
let api_key = ApiKey::try_from(header.request_api_key)
229+
.map_err(|_| anyhow::Error::msg("Unknown API key"))?;
230+
let version = api_key.request_header_version(header.request_api_version);
231+
header.encode(buf, version)
232+
}
233+
212234
pub(crate) fn write_unknown_tagged_fields<B: ByteBufMut, R: RangeBounds<i32>>(
213235
buf: &mut B,
214236
range: R,

tests/all_tests/common.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use testcontainers::{
1212
runners::SyncRunner,
1313
Container, GenericImage, ImageExt,
1414
};
15+
use kafka_protocol::protocol::encode_request_header_into_buffer;
1516

1617
pub fn start_kafka() -> Container<GenericImage> {
1718
GenericImage::new("bitnami/kafka", "3.6.1-debian-11-r24")
@@ -48,9 +49,7 @@ pub fn send_request<T: Encodable + HeaderVersion>(
4849
) {
4950
let mut bytes = BytesMut::new();
5051

51-
header
52-
.encode(&mut bytes, T::header_version(header.request_api_version))
53-
.unwrap();
52+
encode_request_header_into_buffer(&mut bytes, &header).unwrap();
5453
body.encode(&mut bytes, header.request_api_version).unwrap();
5554

5655
let size = bytes.len() as i32;

tests/all_tests/request_header.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,14 @@
1-
use bytes::{Buf, Bytes};
2-
use kafka_protocol::messages::{ApiKey, RequestHeader};
3-
use kafka_protocol::protocol::buf::ByteBuf;
4-
use kafka_protocol::protocol::Decodable;
5-
use std::convert::TryFrom;
1+
use bytes::Bytes;
2+
use kafka_protocol::protocol::decode_request_header_from_buffer;
63

74
#[test]
85
fn request_header() {
96
let mut bytes = Bytes::from(vec![
107
0x00, 0x12, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0d, 0x61, 0x64, 0x6d, 0x69, 0x6e,
118
0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2d, 0x31, 0x00,
129
]);
13-
14-
let api_key = bytes.peek_bytes(0..2).get_i16();
15-
let api_version = bytes.peek_bytes(2..4).get_i16();
16-
let header_version = ApiKey::try_from(api_key)
17-
.unwrap()
18-
.request_header_version(api_version);
19-
let res = RequestHeader::decode(&mut bytes, header_version).unwrap();
10+
11+
let res = decode_request_header_from_buffer(&mut bytes).unwrap();
2012

2113
assert_eq!(res.request_api_key, 18);
2214
assert_eq!(res.request_api_version, 3);

0 commit comments

Comments
 (0)