Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update `stdin` source to use a thread instead of using the tokio version #932

Closed
LucioFranco opened this issue Sep 25, 2019 · 5 comments · Fixed by #960

Comments

@LucioFranco
Copy link
Member

commented Sep 25, 2019

Related to #546 and a bug in tokio where blocking annotations can not be shutdown via a drop in the task. This causes stdin sources to hang when a user attempts to shut it down.

Specification

Update the stdin source to use a similar strategy to what the file source uses by spawning a thread and using stdin directly. Then using the out channel passed into the SourceConfig::build fn to send events back into the topology. This will stop the stdin source from blocking shutdown.

To send events down the out channel you should use https://docs.rs/futures/0.1.29/futures/sync/mpsc/struct.Sender.html#method.try_send to send the event.

To fetch lines off of stdin, https://doc.rust-lang.org/std/io/struct.Stdin.html#method.read_line should be used. Once, there is a String this should be converted into an Event as seen here.

@AlyHKafoury

This comment has been minimized.

Copy link
Contributor

commented Sep 28, 2019

@LucioFranco I am willing to contribute to this issue if possible

@LucioFranco

This comment has been minimized.

Copy link
Member Author

commented Sep 29, 2019

@AlyHKafoury sure! Let me know if you need more info or help!

@AlyHKafoury

This comment has been minimized.

Copy link
Contributor

commented Sep 29, 2019

@LucioFranco a little info about where to start to tackle the stdin source module would be great to get me started

@LucioFranco

This comment has been minimized.

Copy link
Member Author

commented Sep 29, 2019

@AlyHKafoury sure!

So here is the main file https://github.com/timberio/vector/blob/master/src/sources/stdin.rs

This is where we create the stdin stream from tokio https://github.com/timberio/vector/blob/master/src/sources/stdin.rs#L43.

Ideally what we would probably want to do here https://github.com/timberio/vector/blob/master/src/sources/stdin.rs#L55 is to change this to something like so:

    Box::new(future::lazy(move || {
        info!("Capturing STDIN");

        let host_key = config.host_key.clone().unwrap_or(event::HOST.to_string());
        let hostname = hostname::get_hostname();

		let (tx, rx) = futures::sync::mpsc::channel(1024);

		thread::spawn(move || {
			let mut stdin = std::io::stdin();

			loop {
				let mut buf = String::new();
				if let Err(e) = stdin.read_line(&mut buf) { ... }
				tx.try_send(buf).unwrap();
			}
		});

		let source = rx
        .map(move |line| create_event(line, &host_key, &hostname))
        .map_err(|e| error!("error reading line: {:?}", e))
        .forward(out.sink_map_err(|e| error!("Error sending in sink {}", e)))
        .map(|_| info!("finished sending"));

        source
    }))

I believe something like this should work, I didn't check any of the types but this should give you a basic framework to work off of.

Let me know if this makes sense!

@AlyHKafoury

This comment has been minimized.

Copy link
Contributor

commented Sep 29, 2019

@LucioFranco It makes perfect sense, thanks alot I will try to apply it in a couple of days and get back to you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
2 participants
You can’t perform that action at this time.