Skip to content

Commit

Permalink
More kafka port rewriting (#1130)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Apr 17, 2023
1 parent f7ee422 commit 09922c8
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 18 deletions.
22 changes: 20 additions & 2 deletions shotover/src/frame/kafka.rs
Expand Up @@ -2,8 +2,9 @@ use crate::codec::kafka::RequestHeader as CodecRequestHeader;
use anyhow::{anyhow, Context, Result};
use bytes::{BufMut, Bytes, BytesMut};
use kafka_protocol::messages::{
ApiKey, FindCoordinatorRequest, FindCoordinatorResponse, ProduceRequest, ProduceResponse,
RequestHeader, ResponseHeader,
ApiKey, DescribeClusterResponse, FetchResponse, FindCoordinatorRequest,
FindCoordinatorResponse, LeaderAndIsrRequest, MetadataResponse, ProduceRequest,
ProduceResponse, RequestHeader, ResponseHeader,
};
use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion};

Expand All @@ -24,13 +25,17 @@ pub enum KafkaFrame {
pub enum RequestBody {
Produce(ProduceRequest),
FindCoordinator(FindCoordinatorRequest),
LeaderAndIsr(LeaderAndIsrRequest),
Unknown { api_key: ApiKey, message: Bytes },
}

#[derive(Debug, PartialEq, Clone)]
pub enum ResponseBody {
Produce(ProduceResponse),
FindCoordinator(FindCoordinatorResponse),
Fetch(FetchResponse),
Metadata(MetadataResponse),
DescribeCluster(DescribeClusterResponse),
Unknown { api_key: ApiKey, message: Bytes },
}

Expand All @@ -39,6 +44,9 @@ impl ResponseBody {
match self {
ResponseBody::Produce(_) => ProduceResponse::header_version(version),
ResponseBody::FindCoordinator(_) => FindCoordinatorResponse::header_version(version),
ResponseBody::Fetch(_) => FetchResponse::header_version(version),
ResponseBody::Metadata(_) => MetadataResponse::header_version(version),
ResponseBody::DescribeCluster(_) => DescribeClusterResponse::header_version(version),
ResponseBody::Unknown { api_key, .. } => api_key.response_header_version(version),
}
}
Expand Down Expand Up @@ -75,6 +83,7 @@ impl KafkaFrame {
ApiKey::FindCoordinatorKey => {
RequestBody::FindCoordinator(decode(&mut bytes, version)?)
}
ApiKey::LeaderAndIsrKey => RequestBody::LeaderAndIsr(decode(&mut bytes, version)?),
api_key => RequestBody::Unknown {
api_key,
message: bytes,
Expand All @@ -99,6 +108,11 @@ impl KafkaFrame {
ApiKey::FindCoordinatorKey => {
ResponseBody::FindCoordinator(decode(&mut bytes, version)?)
}
ApiKey::FetchKey => ResponseBody::Fetch(decode(&mut bytes, version)?),
ApiKey::MetadataKey => ResponseBody::Metadata(decode(&mut bytes, version)?),
ApiKey::DescribeClusterKey => {
ResponseBody::DescribeCluster(decode(&mut bytes, version)?)
}
api_key => ResponseBody::Unknown {
api_key,
message: bytes,
Expand Down Expand Up @@ -130,6 +144,7 @@ impl KafkaFrame {
match body {
RequestBody::Produce(x) => encode(x, bytes, version)?,
RequestBody::FindCoordinator(x) => encode(x, bytes, version)?,
RequestBody::LeaderAndIsr(x) => encode(x, bytes, version)?,
RequestBody::Unknown { message, .. } => bytes.extend_from_slice(&message),
}
}
Expand All @@ -142,6 +157,9 @@ impl KafkaFrame {
match body {
ResponseBody::Produce(x) => encode(x, bytes, version)?,
ResponseBody::FindCoordinator(x) => encode(x, bytes, version)?,
ResponseBody::Fetch(x) => encode(x, bytes, version)?,
ResponseBody::Metadata(x) => encode(x, bytes, version)?,
ResponseBody::DescribeCluster(x) => encode(x, bytes, version)?,
ResponseBody::Unknown { message, .. } => bytes.extend_from_slice(&message),
}
}
Expand Down
76 changes: 60 additions & 16 deletions shotover/src/transforms/kafka/sink_single.rs
@@ -1,5 +1,5 @@
use crate::codec::{kafka::KafkaCodecBuilder, CodecBuilder, Direction};
use crate::frame::kafka::{KafkaFrame, ResponseBody};
use crate::frame::kafka::{KafkaFrame, RequestBody, ResponseBody};
use crate::frame::Frame;
use crate::message::Messages;
use crate::tcp;
Expand Down Expand Up @@ -40,7 +40,9 @@ impl TransformConfig for KafkaSinkSingleConfig {

#[derive(Clone)]
pub struct KafkaSinkSingleBuilder {
// contains address and port
address: String,
address_port: u16,
connect_timeout: Duration,
read_timeout: Option<Duration>,
}
Expand All @@ -53,9 +55,15 @@ impl KafkaSinkSingleBuilder {
timeout: Option<u64>,
) -> KafkaSinkSingleBuilder {
let receive_timeout = timeout.map(Duration::from_secs);
let address_port = address
.rsplit(':')
.next()
.and_then(|str| str.parse().ok())
.unwrap_or(9092);

KafkaSinkSingleBuilder {
address,
address_port,
connect_timeout: Duration::from_millis(connect_timeout_ms),
read_timeout: receive_timeout,
}
Expand All @@ -67,6 +75,7 @@ impl TransformBuilder for KafkaSinkSingleBuilder {
Transforms::KafkaSinkSingle(KafkaSinkSingle {
outbound: None,
address: self.address.clone(),
address_port: self.address_port,
pushed_messages_tx: None,
connect_timeout: self.connect_timeout,
read_timeout: self.read_timeout,
Expand All @@ -84,6 +93,7 @@ impl TransformBuilder for KafkaSinkSingleBuilder {

pub struct KafkaSinkSingle {
address: String,
address_port: u16,
outbound: Option<Connection>,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
connect_timeout: Duration,
Expand All @@ -92,14 +102,28 @@ pub struct KafkaSinkSingle {

#[async_trait]
impl Transform for KafkaSinkSingle {
async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> Result<Messages> {
async fn transform<'a>(&'a mut self, mut message_wrapper: Wrapper<'a>) -> Result<Messages> {
if self.outbound.is_none() {
let codec = KafkaCodecBuilder::new(Direction::Sink);
let tcp_stream = tcp::tcp_stream(self.connect_timeout, &self.address).await?;
let (rx, tx) = tcp_stream.into_split();
self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx));
}

// Rewrite requests to use kafkas port instead of shotovers port
for request in &mut message_wrapper.messages {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::LeaderAndIsr(leader_and_isr),
..
})) = request.frame()
{
for leader in &mut leader_and_isr.live_leaders {
leader.port = self.address_port as i32;
}
request.invalidate_cache();
}
}

let outbound = self.outbound.as_mut().unwrap();
let responses: Result<Vec<_>> = message_wrapper
.messages
Expand All @@ -124,23 +148,43 @@ impl Transform for KafkaSinkSingle {
read_responses(responses).await
}?;

// Rewrite FindCoordinator responses messages to use shotovers port instead of kafkas port
// Rewrite responses to use shotovers port instead of kafkas port
for response in &mut responses {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::FindCoordinator(find_coordinator),
version,
..
})) = response.frame()
{
let port = message_wrapper.local_addr.port() as i32;
if *version <= 3 {
find_coordinator.port = port;
} else {
for coordinator in &mut find_coordinator.coordinators {
coordinator.port = port;
let port = message_wrapper.local_addr.port() as i32;
match response.frame() {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::FindCoordinator(find_coordinator),
version,
..
})) => {
if *version <= 3 {
find_coordinator.port = port;
} else {
for coordinator in &mut find_coordinator.coordinators {
coordinator.port = port;
}
}
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Metadata(metadata),
..
})) => {
for broker in &mut metadata.brokers {
broker.1.port = port;
}
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeCluster(describe_cluster),
..
})) => {
for broker in &mut describe_cluster.brokers {
broker.1.port = port;
}
response.invalidate_cache();
}
response.invalidate_cache();
_ => {}
}
}

Expand Down

0 comments on commit 09922c8

Please sign in to comment.