Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Aug 15, 2022
1 parent cc58265 commit 5d2ab8d
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 11 deletions.
5 changes: 2 additions & 3 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::tls::TlsAcceptor;
use crate::transforms::chain::TransformChain;
use crate::transforms::Wrapper;
use anyhow::{anyhow, Context, Result};
use futures::SinkExt;
use futures::{SinkExt, StreamExt};
use metrics::{register_gauge, Gauge};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
Expand All @@ -13,7 +13,6 @@ use tokio::sync::{mpsc, watch, Semaphore};
use tokio::time;
use tokio::time::timeout;
use tokio::time::Duration;
use tokio_stream::StreamExt as TokioStreamExt;
use tokio_util::codec::{Decoder, Encoder};
use tokio_util::codec::{FramedRead, FramedWrite};
use tracing::Instrument;
Expand Down Expand Up @@ -396,7 +395,7 @@ fn spawn_read_write_tasks<
async move {
loop {
tokio::select! {
result = TokioStreamExt::next(&mut reader) => {
result = reader.next() => {
if let Some(message) = result {
match message {
Ok(message) => {
Expand Down
7 changes: 3 additions & 4 deletions shotover-proxy/src/transforms/cassandra/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::StreamExt as TokioStreamExt;
use tokio_util::codec::{FramedRead, FramedWrite};
use tracing::{info, Instrument};

Expand Down Expand Up @@ -90,7 +89,7 @@ async fn tx_process<C: CodecWriteHalf, T: AsyncWrite>(
codec: C,
) -> Result<()> {
let in_w = FramedWrite::new(write, codec);
let rx_stream = TokioStreamExt::map(UnboundedReceiverStream::new(out_rx), |x| {
let rx_stream = UnboundedReceiverStream::new(out_rx).map(|x| {
let ret = Ok(vec![x.message.clone()]);
return_tx.send(x)?;
ret
Expand All @@ -112,7 +111,7 @@ async fn rx_process<C: CodecReadHalf, T: AsyncRead>(

loop {
tokio::select! {
Some(maybe_req) = TokioStreamExt::next(&mut in_r) => {
Some(maybe_req) = in_r.next() => {
match maybe_req {
Ok(req) => {
for m in req {
Expand Down Expand Up @@ -196,7 +195,7 @@ pub async fn receive_message(
failed_requests: &metrics::Counter,
results: &mut FuturesOrdered<oneshot::Receiver<Response>>,
) -> Result<Message> {
match tokio_stream::StreamExt::next(results).await {
match results.next().await {
Some(result) => match result? {
Response {
response: Ok(message),
Expand Down
5 changes: 2 additions & 3 deletions shotover-proxy/src/transforms/util/cluster_connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::Mutex;
use tokio::time::timeout;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::StreamExt as TokioStreamExt;
use tokio_util::codec::{FramedRead, FramedWrite};
use tracing::{debug, trace, warn, Instrument};

Expand Down Expand Up @@ -246,7 +245,7 @@ async fn tx_process<C: CodecWriteHalf, W: AsyncWrite + Unpin + Send + 'static>(
codec: C,
) -> Result<()> {
let in_w = FramedWrite::new(write, codec);
let rx_stream = TokioStreamExt::map(UnboundedReceiverStream::new(out_rx), |x| {
let rx_stream = UnboundedReceiverStream::new(out_rx).map(|x| {
let ret = Ok(vec![x.message.clone()]);
return_tx.send(x)?;
ret
Expand All @@ -261,7 +260,7 @@ async fn rx_process<C: CodecReadHalf, R: AsyncRead + Unpin + Send + 'static>(
) -> Result<()> {
let mut in_r = FramedRead::new(read, codec);

while let Some(maybe_req) = TokioStreamExt::next(&mut in_r).await {
while let Some(maybe_req) = in_r.next().await {
match maybe_req {
Ok(req) => {
for m in req {
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/tests/codec/cassandra.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use futures::sink::SinkExt;
use futures::SinkExt;
use serial_test::serial;
use shotover_proxy::codec::cassandra::CassandraCodec;
use tokio::io::BufWriter;
Expand Down

0 comments on commit 5d2ab8d

Please sign in to comment.