Skip to content

Commit

Permalink
Rework exec to handle stdin (#54)
Browse files Browse the repository at this point in the history
* Extract common `stream::attach` method

* Add missing `attach_stdin` field to `ExecCreateOpts`

* Rework exec

* Update tests and examples

* Update CHANGELOG

* Fix clippy
  • Loading branch information
vv9k authored Apr 16, 2023
1 parent 842a850 commit 3b39460
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 187 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
- Add `ContainerCreateOptsBuilder::network_config`
- `Docker` initializers like `new`, `tcp`, `unix`, `tls` now create an unversioned connector that will use the server's latest version instead of setting it to `LATEST_API_VERSION`.
This means that by default this crate will be easier to use with older versions of Docker.
- `Exec::start` and `Container::exec` now take `ExecStartOpts` options as additional parameter
- Add missing `attach_stdin` to `ExecCreateOpts`
- `Exec::start` and `Container::exec` signature changed. It is now async and returns a result with `tty::Multiplexer` (same as attach)
so that it can handle writing to STDIN.

# 0.13.0
- Fix Container::attach output when TTY is enabled on container
Expand Down
7 changes: 6 additions & 1 deletion examples/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.attach_stderr(true)
.build();

while let Some(exec_result) = docker.containers().get(&id).exec(&options).next().await {
let container = docker.containers().get(&id);
let mut stream = container
.exec(&options, &Default::default())
.await
.expect("exec stream");
while let Some(exec_result) = stream.next().await {
match exec_result {
Ok(chunk) => print_chunk(chunk),
Err(e) => eprintln!("Error: {e}"),
Expand Down
19 changes: 16 additions & 3 deletions examples/exec.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod common;
use clap::Parser;
use common::new_docker;
use docker_api::Exec;
use docker_api::{conn::TtyChunk, Exec};

#[derive(Parser)]
pub struct Opts {
Expand Down Expand Up @@ -48,9 +48,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("{:#?}", exec.inspect().await?);

let mut stream = exec.start();
let mut stream = exec.start(&Default::default()).await?;

stream.next().await;
while let Some(Ok(chunk)) = stream.next().await {
println!("{chunk:?}");
match chunk {
TtyChunk::StdOut(buf) => {
println!("STDOUT: {}", String::from_utf8_lossy(&buf));
}
TtyChunk::StdErr(buf) => {
println!("STDERR: {}", String::from_utf8_lossy(&buf));
}
TtyChunk::StdIn(buf) => {
println!("STDIN: {}", String::from_utf8_lossy(&buf));
}
}
}

println!("{:#?}", exec.inspect().await?);
}
Expand Down
50 changes: 20 additions & 30 deletions src/api/container.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
//! Create and manage containers.
use crate::models;
use crate::opts::{
ContainerCommitOpts, ContainerCreateOpts, ContainerListOpts, ContainerPruneOpts,
ContainerRemoveOpts, ContainerRestartOpts, ContainerStopOpts,
ContainerRemoveOpts, ContainerRestartOpts, ContainerStopOpts, ExecStartOpts,
};
use crate::{models, stream};

use std::{io, path::Path, str};

use futures_util::{
io::{AsyncRead, AsyncWrite},
Stream, TryStreamExt,
};
use futures_util::{Stream, TryStreamExt};
use hyper::Body;
use serde::Deserialize;

use crate::{
api::Exec,
conn::{tty, Headers, Payload, TtyChunk},
conn::{tty, Headers, Payload},
opts::ExecCreateOpts,
Error, Result,
};
Expand All @@ -43,19 +40,6 @@ impl Container {
self.docker.get_json(&ep).await
}}

/// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin.
async fn attach_raw(&self) -> Result<impl AsyncRead + AsyncWrite + Send + '_> {
self.docker
.post_upgrade_stream(
format!(
"/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1",
self.id
),
Payload::empty(),
)
.await
}

api_doc! { Container => Attach
|
/// Attaches a [`TtyMultiplexer`](TtyMultiplexer) to the container.
Expand All @@ -66,11 +50,16 @@ impl Container {
pub async fn attach(&self) -> Result<tty::Multiplexer<'_>> {
let inspect = self.inspect().await?;
let is_tty = inspect.config.and_then(|c| c.tty).unwrap_or_default();
self.attach_raw().await.map(|s| if is_tty {
tty::Multiplexer::new(s, tty::decode_raw)
} else {
tty::Multiplexer::new(s, tty::decode_chunk)
})
stream::attach(
&self.docker,
format!(
"/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1",
self.id
),
Payload::empty(),
is_tty,
)
.await
}}

api_doc! { Container => Changes
Expand Down Expand Up @@ -226,11 +215,12 @@ impl Container {
api_doc! { Exec
|
/// Execute a command in this container.
pub fn exec(
&self,
opts: &ExecCreateOpts,
) -> impl Stream<Item = crate::conn::Result<TtyChunk>> + Unpin + '_ {
Exec::create_and_start(&self.docker, &self.id, opts)
pub async fn exec<'docker>(
&'docker self,
create_opts: &ExecCreateOpts,
start_opts: &ExecStartOpts,
) -> Result<tty::Multiplexer<'docker>> {
Exec::create_and_start(&self.docker, &self.id, create_opts, start_opts).await
}}

api_doc! { Container => Archive
Expand Down
173 changes: 68 additions & 105 deletions src/api/exec.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
//! Run new commands inside running containers.

use futures_util::{
stream::{Stream, TryStreamExt},
TryFutureExt,
};
use hyper::Body;

use crate::{
conn::{tty, Headers, Payload},
models,
opts::{ExecCreateOpts, ExecResizeOpts},
Docker, Result,
opts::{ExecCreateOpts, ExecResizeOpts, ExecStartOpts},
stream, Docker, Result,
};

api_doc! { Exec
Expand All @@ -29,21 +25,31 @@ impl Exec {
}
}

impl_api_ep! {exec: Exec, resp
Inspect -> &format!("/exec/{}/json", exec.id), models::ExecInspect200Response
/// Get a reference to a set of operations available to an already created exec instance.
///
/// It's in callers responsibility to ensure that exec instance with specified id actually
/// exists. Use [Exec::create](Exec::create) to ensure that the exec instance is created
/// beforehand.
pub fn get(docker: Docker, id: impl Into<crate::Id>) -> Exec {
Exec::new(docker, id)
}

api_doc! { Exec => Create
api_doc! { Exec => Inspect
|
/// Creates a new exec instance that will be executed in a container with id == container_id.
pub async fn create<C>(
/// Inspect this Exec instance
pub async fn inspect(&self) -> Result<models::ExecInspect200Response> {
Self::inspect_impl(&self.docker, self.id.as_ref()).await
}}

async fn inspect_impl(docker: &Docker, id: &str) -> Result<models::ExecInspect200Response> {
docker.get_json(&format!("/exec/{id}/json")).await
}

async fn create_impl(
docker: Docker,
container_id: C,
container_id: &str,
opts: &ExecCreateOpts,
) -> Result<Exec>
where
C: AsRef<str>,
{
) -> Result<crate::Id> {
#[derive(serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
struct Response {
Expand All @@ -52,111 +58,68 @@ impl Exec {

docker
.post_json(
&format!("/containers/{}/exec", container_id.as_ref()),
&format!("/containers/{}/exec", container_id),
Payload::Json(opts.serialize_vec()?),
Headers::none(),
)
.await
.map(|resp: Response| Exec::new(docker, resp.id))
}}
.map(|resp: Response| resp.id.into())
}

// This exists for Container::exec()
//
// We need to combine `Exec::create` and `Exec::start` into one method because otherwise you
// needlessly tie the Stream to the lifetime of `container_id` and `opts`. This is because
// `Exec::create` is async so it must occur inside of the `async move` block. However, this
// means that `container_id` and `opts` are both expected to be alive in the returned stream
// because we can't do the work of creating an endpoint from `container_id` or serializing
// `opts`. By doing this work outside of the stream, we get owned values that we can then move
// into the stream and have the lifetimes work out as you would expect.
//
// Yes, it is sad that we can't do the easy method and thus have some duplicated code.
pub(crate) fn create_and_start<'docker, C>(
docker: &'docker Docker,
container_id: C,
api_doc! { Exec => Create
|
/// Creates a new exec instance that will be executed in a container with id == container_id.
pub async fn create(
docker: Docker,
container_id: impl AsRef<str>,
opts: &ExecCreateOpts,
) -> impl Stream<Item = crate::conn::Result<tty::TtyChunk>> + Unpin + 'docker
where
C: AsRef<str>,
) -> Result<Exec>
{
#[derive(serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
struct Response {
id: String,
}
Self::create_impl(docker.clone(), container_id.as_ref(), opts)
.await
.map(|id| Exec::new(docker, id))
}}

// To not tie the lifetime of `opts` to the stream, we do the serializing work outside of
// the stream. But for backwards compatability, we have to return the error inside of the
// stream.
let body_result = opts.serialize();

// To not tie the lifetime of `container_id` to the stream, we convert it to an (owned)
// endpoint outside of the stream.
let container_endpoint = format!("/containers/{}/exec", container_id.as_ref());

Box::pin(
async move {
let exec_id = docker
.post_json(
&container_endpoint,
Payload::Json(
body_result.map_err(|e| crate::conn::Error::Any(Box::new(e)))?,
),
Headers::none(),
)
.await
.map(|resp: Response| resp.id)
.map_err(|e| crate::conn::Error::Any(Box::new(e)))?;

let stream = Box::pin(
docker
.post_stream(
format!("/exec/{exec_id}/start"),
Payload::Json("{}"),
Headers::none(),
)
.map_err(|e| crate::conn::Error::Any(Box::new(e))),
);

Ok(tty::decode(stream))
}
.try_flatten_stream(),
async fn start_impl<'docker>(
docker: &'docker Docker,
id: &str,
opts: &ExecStartOpts,
) -> Result<tty::Multiplexer<'docker>> {
let endpoint = format!("/exec/{}/start", id);
let inspect_data = Self::inspect_impl(docker, id).await?;
let is_tty = inspect_data
.process_config
.and_then(|c| c.tty)
.unwrap_or_default();

stream::attach(
docker,
endpoint,
Payload::Json(opts.serialize_vec()?.into()),
is_tty,
)
}

/// Get a reference to a set of operations available to an already created exec instance.
///
/// It's in callers responsibility to ensure that exec instance with specified id actually
/// exists. Use [Exec::create](Exec::create) to ensure that the exec instance is created
/// beforehand.
pub fn get(docker: Docker, id: impl Into<crate::Id>) -> Exec {
Exec::new(docker, id)
.await
}

api_doc! { Exec => Start
|
/// Starts this exec instance returning a multiplexed tty stream.
pub fn start(&self) -> impl Stream<Item = crate::conn::Result<tty::TtyChunk>> + '_ {
// We must take ownership of the docker reference to not needlessly tie the stream to the
// lifetime of `self`.
let docker = &self.docker;
// We convert `self.id` into the (owned) endpoint outside of the stream to not needlessly
// tie the stream to the lifetime of `self`.
let endpoint = format!("/exec/{}/start", &self.id);
Box::pin(
async move {
let stream = Box::pin(
docker
.post_stream(endpoint, Payload::Json("{}"), Headers::none())
.map_err(|e| crate::conn::Error::Any(Box::new(e))),
);

Ok(tty::decode(stream))
}
.try_flatten_stream(),
)
pub async fn start(&self, opts: &ExecStartOpts) -> Result<tty::Multiplexer<'_>> {
Self::start_impl(&self.docker, self.id.as_ref(), opts).await
}}

pub(crate) async fn create_and_start<'docker>(
docker: &'docker Docker,
container_id: impl AsRef<str>,
create_opts: &ExecCreateOpts,
start_opts: &ExecStartOpts,
) -> Result<tty::Multiplexer<'docker>> {
let container_id = container_id.as_ref();
let id = Self::create_impl(docker.clone(), container_id, create_opts).await?;

Self::start_impl(docker, id.as_ref(), start_opts).await
}

api_doc! { Exec => Resize
|
/// Resize the TTY session used by an exec instance. This only works if the exec was created
Expand Down
1 change: 1 addition & 0 deletions src/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ impl Docker {
self.client.head(self.make_endpoint(endpoint)).await
}

#[allow(dead_code)]
/// Send a streaming post request.
///
/// Use stream_post_into_values if the endpoint returns JSON values
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod builder;

pub mod api;
pub mod models;
mod stream;
pub mod conn {
//! Connection related items
pub(crate) use containers_api::conn::*;
Expand Down
Loading

0 comments on commit 3b39460

Please sign in to comment.