Skip to content

Commit

Permalink
fix: add an extra flag to decide frame format
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongchen committed Jul 24, 2023
1 parent 0ef5d5f commit 6899035
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion src/sinks/websocket/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,26 @@ pub struct WebSocketSink {
connector: WebSocketConnector,
ping_interval: Option<NonZeroU64>,
ping_timeout: Option<NonZeroU64>,
encode_as_binary: bool,
}

impl WebSocketSink {
pub fn new(config: &WebSocketSinkConfig, connector: WebSocketConnector) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let serializer = config.encoding.build()?;
let encoder = Encoder::<()>::new(serializer);
let encode_as_binary = match config.encoding.config() {
codecs::encoding::SerializerConfig::RawMessage => true,
_ => true,
};

Ok(Self {
transformer,
encoder,
connector,
ping_interval: config.ping_interval,
ping_timeout: config.ping_timeout,
encode_as_binary: encode_as_binary,
})
}

Expand Down Expand Up @@ -301,7 +307,10 @@ impl WebSocketSink {
Ok(()) => {
finalizers.update_status(EventStatus::Delivered);

let message = Message::binary(bytes);
let message = match self.encode_as_binary {
true => Message::binary(bytes),
false => Message::text(String::from_utf8_lossy(&bytes)),
};
let message_len = message.len();

ws_sink.send(message).await.map(|_| {
Expand Down

0 comments on commit 6899035

Please sign in to comment.