From 6a8c92328f8ba65f4d4b9449395d9af0a768188b Mon Sep 17 00:00:00 2001 From: "Adam H. Leventhal" Date: Fri, 15 Jul 2022 12:49:23 -0700 Subject: [PATCH 1/3] update sled-agent with streaming progenitor interface --- Cargo.lock | 10 ++-- nexus/src/internal_api/http_entrypoints.rs | 5 +- openapi/nexus-internal.json | 8 ++- sled-agent/src/mocks/mod.rs | 5 +- sled-agent/src/updates.rs | 64 ++++++++++++---------- 5 files changed, 52 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b906183f20..501619db3d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4068,7 +4068,7 @@ dependencies = [ [[package]] name = "progenitor" version = "0.1.2-dev" -source = "git+https://github.com/oxidecomputer/progenitor#6ca5670819c88267b657be2f44a15e4f5f22b727" +source = "git+https://github.com/oxidecomputer/progenitor#5a72e1b8e7c1ac729519447fe424d0179fe42c07" dependencies = [ "anyhow", "clap 3.2.12", @@ -4083,7 +4083,7 @@ dependencies = [ [[package]] name = "progenitor-client" version = "0.1.2-dev" -source = "git+https://github.com/oxidecomputer/progenitor#6ca5670819c88267b657be2f44a15e4f5f22b727" +source = "git+https://github.com/oxidecomputer/progenitor#5a72e1b8e7c1ac729519447fe424d0179fe42c07" dependencies = [ "bytes", "futures-core", @@ -4097,7 +4097,7 @@ dependencies = [ [[package]] name = "progenitor-impl" version = "0.1.2-dev" -source = "git+https://github.com/oxidecomputer/progenitor#6ca5670819c88267b657be2f44a15e4f5f22b727" +source = "git+https://github.com/oxidecomputer/progenitor#5a72e1b8e7c1ac729519447fe424d0179fe42c07" dependencies = [ "getopts", "heck 0.4.0", @@ -4119,7 +4119,7 @@ dependencies = [ [[package]] name = "progenitor-macro" version = "0.1.2-dev" -source = "git+https://github.com/oxidecomputer/progenitor#6ca5670819c88267b657be2f44a15e4f5f22b727" +source = "git+https://github.com/oxidecomputer/progenitor#5a72e1b8e7c1ac729519447fe424d0179fe42c07" dependencies = [ "openapiv3", "proc-macro2", @@ -4134,7 +4134,7 @@ dependencies = [ [[package]] name = "propolis-client" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/propolis?rev=a9399a8007f9876e31ce152848e6ecc2a9e14283#a9399a8007f9876e31ce152848e6ecc2a9e14283" +source = "git+https://github.com/oxidecomputer/propolis?rev=a9399a8007f9876e31ce152848e6ecc2a9e14283" dependencies = [ "crucible", "reqwest", diff --git a/nexus/src/internal_api/http_entrypoints.rs b/nexus/src/internal_api/http_entrypoints.rs index ef3f7c125c0..d8fac41ed08 100644 --- a/nexus/src/internal_api/http_entrypoints.rs +++ b/nexus/src/internal_api/http_entrypoints.rs @@ -12,6 +12,7 @@ use super::params::{ }; use dropshot::endpoint; use dropshot::ApiDescription; +use dropshot::FreeformBody; use dropshot::HttpError; use dropshot::HttpResponseOk; use dropshot::HttpResponseUpdatedNoContent; @@ -299,7 +300,7 @@ async fn cpapi_metrics_collect( async fn cpapi_artifact_download( request_context: Arc>>, path_params: Path, -) -> Result, HttpError> { +) -> Result, HttpError> { let context = request_context.context(); let nexus = &context.nexus; let opctx = OpContext::for_internal_api(&request_context).await; @@ -307,5 +308,5 @@ async fn cpapi_artifact_download( let body = nexus.download_artifact(&opctx, path_params.into_inner()).await?; - Ok(Response::builder().status(StatusCode::OK).body(body.into())?) + Ok(HttpResponseOk(Body::from(body).into())) } diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index 046b86764fc..47183aaa3fa 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -45,13 +45,19 @@ } ], "responses": { - "default": { + "200": { "description": "", "content": { "*/*": { "schema": {} } } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" } } } diff --git a/sled-agent/src/mocks/mod.rs b/sled-agent/src/mocks/mod.rs index 4c877e0864c..bf097c91fcc 100644 --- a/sled-agent/src/mocks/mod.rs +++ b/sled-agent/src/mocks/mod.rs @@ -10,12 +10,11 @@ use nexus_client::types::{ InstanceRuntimeState, SledAgentStartupInfo, UpdateArtifactKind, ZpoolPutRequest, ZpoolPutResponse, }; -use reqwest::Response; use slog::Logger; use uuid::Uuid; type Result = std::result::Result< - T, + progenitor::progenitor_client::ResponseValue, progenitor::progenitor_client::Error, >; @@ -44,7 +43,7 @@ mock! { kind: UpdateArtifactKind, name: &str, version: i64, - ) -> Result; + ) -> Result; pub async fn zpool_put( &self, sled_id: &Uuid, diff --git a/sled-agent/src/updates.rs b/sled-agent/src/updates.rs index 721861ca074..33b4dcc6edf 100644 --- a/sled-agent/src/updates.rs +++ b/sled-agent/src/updates.rs @@ -5,6 +5,7 @@ //! Management of per-sled updates use crate::nexus::NexusClient; +use futures::TryStreamExt; use omicron_common::api::internal::nexus::{ UpdateArtifact, UpdateArtifactKind, }; @@ -19,11 +20,8 @@ pub enum Error { err: std::io::Error, }, - #[error("Failed to contact nexus: {0}")] - Nexus(anyhow::Error), - - #[error("Failed to read response from Nexus: {0}")] - Response(reqwest::Error), + #[error("Failed request to Nexus: {0}")] + Response(nexus_client::Error), } pub async fn download_artifact( @@ -48,24 +46,27 @@ pub async fn download_artifact( // Fetch the artifact and write to the file in its entirety, // replacing it if it exists. - // TODO: Would love to stream this instead. - // ALSO TODO: This is, for the moment, using the endpoint directly - // instead of using the client method to work around issues in - // dropshot/progenitor for getting raw response bodies. + let response = nexus - .client() - .get(format!( - "{}/artifacts/{}/{}/{}", - nexus.baseurl(), - artifact.kind, - artifact.name, - artifact.version - )) - .send() + .cpapi_artifact_download( + nexus_client::types::UpdateArtifactKind::Zone, + &artifact.name, + artifact.version, + ) .await .map_err(Error::Response)?; - let contents = - response.bytes().await.map_err(|e| Error::Response(e))?; + + // TODO it would be better to stream this into the file rather than + // accumulating it in memory. + let contents = response + .into_inner_stream() + .try_fold(Vec::new(), |mut acc, x| async move { + acc.extend(x); + Ok(acc) + }) + .await + .map_err(|e| Error::Response(e.into()))?; + tokio::fs::write(&tmp_path, contents).await.map_err(|err| { Error::Io { message: format!( @@ -94,13 +95,13 @@ pub async fn download_artifact( mod test { use super::*; use crate::mocks::MockNexusClient; - use http::{Response, StatusCode}; + use bytes::Bytes; + use http::StatusCode; + use progenitor::progenitor_client::{ByteStream, ResponseValue}; + use reqwest::{header::HeaderMap, Result}; #[tokio::test] #[serial_test::serial] - // TODO this is hard to mock out when not using the generated client - // methods :( but the logic is covered in the updates integration test - #[ignore] async fn test_write_artifact_to_filesystem() { // The (completely fabricated) artifact we'd like to download. let expected_name = "test_artifact"; @@ -122,11 +123,16 @@ mod test { assert_eq!(name, "test_artifact"); assert_eq!(version, 3); assert_eq!(kind.to_string(), "zone"); - let response = Response::builder() - .status(StatusCode::OK) - .body(expected_contents) - .unwrap(); - Ok(response.into()) + let response = ByteStream::new(Box::pin( + futures::stream::once(futures::future::ready(Result::Ok( + Bytes::from(expected_contents), + ))), + )); + Ok(ResponseValue::new( + response, + StatusCode::OK, + HeaderMap::default(), + )) }, ); From e5484711c2784c6c6020b252a696efa4cd54361c Mon Sep 17 00:00:00 2001 From: "Adam H. Leventhal" Date: Fri, 15 Jul 2022 13:46:48 -0700 Subject: [PATCH 2/3] fix check --- nexus/src/internal_api/http_entrypoints.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/nexus/src/internal_api/http_entrypoints.rs b/nexus/src/internal_api/http_entrypoints.rs index d8fac41ed08..4bf280c7b70 100644 --- a/nexus/src/internal_api/http_entrypoints.rs +++ b/nexus/src/internal_api/http_entrypoints.rs @@ -19,7 +19,6 @@ use dropshot::HttpResponseUpdatedNoContent; use dropshot::Path; use dropshot::RequestContext; use dropshot::TypedBody; -use http::{Response, StatusCode}; use hyper::Body; use omicron_common::api::internal::nexus::DiskRuntimeState; use omicron_common::api::internal::nexus::InstanceRuntimeState; From e81d556673d6a08ba14014ad9d2daa1c02361ea3 Mon Sep 17 00:00:00 2001 From: "Adam H. Leventhal" Date: Fri, 15 Jul 2022 16:35:03 -0700 Subject: [PATCH 3/3] write the file a chunk at a time --- sled-agent/src/updates.rs | 47 ++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/sled-agent/src/updates.rs b/sled-agent/src/updates.rs index 33b4dcc6edf..abb0e06c9fd 100644 --- a/sled-agent/src/updates.rs +++ b/sled-agent/src/updates.rs @@ -5,11 +5,12 @@ //! Management of per-sled updates use crate::nexus::NexusClient; -use futures::TryStreamExt; +use futures::{TryFutureExt, TryStreamExt}; use omicron_common::api::internal::nexus::{ UpdateArtifact, UpdateArtifactKind, }; use std::path::PathBuf; +use tokio::io::AsyncWriteExt; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -56,27 +57,31 @@ pub async fn download_artifact( .await .map_err(Error::Response)?; - // TODO it would be better to stream this into the file rather than - // accumulating it in memory. - let contents = response - .into_inner_stream() - .try_fold(Vec::new(), |mut acc, x| async move { - acc.extend(x); - Ok(acc) - }) + let mut file = + tokio::fs::File::create(&tmp_path).await.map_err(|err| { + Error::Io { + message: format!( + "create {}", + tmp_path.to_string_lossy() + ), + err, + } + })?; + let mut stream = response.into_inner_stream(); + while let Some(chunk) = stream + .try_next() .await - .map_err(|e| Error::Response(e.into()))?; - - tokio::fs::write(&tmp_path, contents).await.map_err(|err| { - Error::Io { - message: format!( - "Downloading artifact to temporary path: {tmp_path:?}" - ), - err, - } - })?; - - // Write the file to its final path. + .map_err(|e| Error::Response(e.into()))? + { + file.write_all(&chunk) + .map_err(|err| Error::Io { + message: "write_all".to_string(), + err, + }) + .await?; + } + + // Move the file to its final path. let destination = directory.join(artifact.name); tokio::fs::rename(&tmp_path, &destination).await.map_err( |err| Error::Io {