Skip to content

Commit

Permalink
Replace stream! invocations with appropriate stream wrappers
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Sichert <mail@pablosichert.com>
  • Loading branch information
pablosichert committed Mar 26, 2021
1 parent ddfbbe7 commit fbb5a32
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 54 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ futures = { version = "0.3", default-features = false, features = ["compat", "io
futures01 = { package = "futures", version = "0.1.25" }
tokio = { version = "1.3.0", features = ["full"] }
tokio-openssl = "0.6.1"
tokio-stream = { version = "0.1.2", features = ["net"] }
tokio-stream = { version = "0.1.3", features = ["net", "sync"] }
tokio-util = { version = "0.6.2", features = ["codec", "time"] }

# Tracing
Expand Down
1 change: 1 addition & 0 deletions lib/vector-api-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async-stream = "0.3.0"
async-trait = "0.1"
futures = { version = "0.3", default-features = false, features = ["compat", "io-compat"] }
tokio = { version = "1.3.0", features = ["full"] }
tokio-stream = { version = "0.1.3", features = ["sync"] }

# GraphQL
graphql_client = "0.9.0"
Expand Down
20 changes: 7 additions & 13 deletions lib/vector-api-client/src/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use async_stream::stream;
use futures::{
stream::{Stream, StreamExt},
SinkExt,
};
use futures::SinkExt;
use graphql_client::GraphQLQuery;
use serde::{Deserialize, Serialize};
use serde_json::json;
Expand All @@ -11,6 +7,7 @@ use std::{
sync::{Arc, Mutex, Weak},
};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use url::Url;
use uuid::Uuid;
Expand Down Expand Up @@ -133,14 +130,11 @@ where
{
/// Returns a stream of `Payload` responses, received from the GraphQL server
fn stream(&self) -> StreamResponse<T> {
let mut rx = self.tx.subscribe();
Box::pin(stream! {
loop {
if let Ok(p) = rx.recv().await {
yield p.response::<T>()
}
}
})
Box::pin(
BroadcastStream::new(self.tx.subscribe())
.filter(Result::is_ok)
.map(|p| p.unwrap().response::<T>()),
)
}
}

Expand Down
29 changes: 9 additions & 20 deletions src/api/schema/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ use crate::{
filter_check,
};
use async_graphql::{Enum, InputObject, Interface, Object, Subscription};
use async_stream::stream;
use lazy_static::lazy_static;
use std::{
cmp,
collections::{HashMap, HashSet},
};
use tokio_stream::Stream;
use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt};

#[derive(Debug, Clone, Interface)]
#[graphql(
Expand Down Expand Up @@ -230,28 +229,18 @@ pub struct ComponentsSubscription;
impl ComponentsSubscription {
/// Subscribes to all newly added components
async fn component_added(&self) -> impl Stream<Item = Component> {
let mut rx = COMPONENT_CHANGED.subscribe();
stream! {
loop {
match rx.recv().await {
Ok(ComponentChanged::Added(c)) => yield c,
_ => {},
}
}
}
BroadcastStream::new(COMPONENT_CHANGED.subscribe()).filter_map(|c| match c {
Ok(ComponentChanged::Added(c)) => Some(c),
_ => None,
})
}

/// Subscribes to all removed components
async fn component_removed(&self) -> impl Stream<Item = Component> {
let mut rx = COMPONENT_CHANGED.subscribe();
stream! {
loop {
match rx.recv().await {
Ok(ComponentChanged::Removed(c)) => yield c,
_ => {},
}
}
}
BroadcastStream::new(COMPONENT_CHANGED.subscribe()).filter_map(|c| match c {
Ok(ComponentChanged::Removed(c)) => Some(c),
_ => None,
})
}
}

Expand Down
9 changes: 2 additions & 7 deletions src/sources/util/unix_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use crate::{
sources::Source,
Pipeline,
};
use async_stream::stream;
use bytes::Bytes;
use futures::{FutureExt, SinkExt, StreamExt};
use std::{future::ready, path::PathBuf};
use tokio::io::AsyncWriteExt;
use tokio::net::{UnixListener, UnixStream};
use tokio_stream::wrappers::UnixListenerStream;
use tokio_util::codec::{Decoder, FramedRead};
use tracing::field;
use tracing_futures::Instrument;
Expand Down Expand Up @@ -40,12 +40,7 @@ where
info!(message = "Listening.", path = ?listen_path, r#type = "unix");

let connection_open = OpenGauge::new();
let stream = stream! {
loop {
yield listener.accept().await.map(|(stream, _addr)| stream)
}
}
.take_until(shutdown.clone());
let stream = UnixListenerStream::new(listener).take_until(shutdown.clone());
tokio::pin!(stream);
while let Some(socket) = stream.next().await {
let socket = match socket {
Expand Down
30 changes: 17 additions & 13 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::{
topology::{self, RunningTopology},
trace, Event,
};
use async_stream::stream;
use flate2::read::GzDecoder;
use futures::{
ready, stream, task::noop_waker_ref, FutureExt, SinkExt, Stream, StreamExt, TryStreamExt,
Expand Down Expand Up @@ -36,6 +35,9 @@ use tokio::{
task::JoinHandle,
time::{sleep, Duration, Instant},
};
use tokio_stream::wrappers::TcpListenerStream;
#[cfg(unix)]
use tokio_stream::wrappers::UnixListenerStream;
use tokio_util::codec::{Encoder, FramedRead, FramedWrite, LinesCodec};

const WAIT_FOR_SECS: u64 = 5; // The default time to wait in `wait_for`
Expand Down Expand Up @@ -445,12 +447,13 @@ impl CountReceiver<String> {
pub fn receive_lines(addr: SocketAddr) -> CountReceiver<String> {
CountReceiver::new(|count, tripwire, connected| async move {
let listener = TcpListener::bind(addr).await.unwrap();
let stream = stream! {
loop {
yield listener.accept().await.map(|(stream, _addr)| stream)
}
};
CountReceiver::receive_lines_stream(stream, count, tripwire, Some(connected)).await
CountReceiver::receive_lines_stream(
TcpListenerStream::new(listener),
count,
tripwire,
Some(connected),
)
.await
})
}

Expand All @@ -461,12 +464,13 @@ impl CountReceiver<String> {
{
CountReceiver::new(|count, tripwire, connected| async move {
let listener = tokio::net::UnixListener::bind(path).unwrap();
let stream = stream! {
loop {
yield listener.accept().await.map(|(stream, _addr)| stream)
}
};
CountReceiver::receive_lines_stream(stream, count, tripwire, Some(connected)).await
CountReceiver::receive_lines_stream(
UnixListenerStream::new(listener),
count,
tripwire,
Some(connected),
)
.await
})
}

Expand Down

1 comment on commit fbb5a32

@leebenson
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a heads up that #6610 introduces another one to catch. Apologies for the moving target! 🎯

Please sign in to comment.