Skip to content

Commit

Permalink
Enhance Pulumi stack execution and handle node.js environment
Browse files Browse the repository at this point in the history
This commit adds changes related to Pulumi stack execution. Now, the stack, auth details and inner stack source are fetched in the `run_internal` function. The code also fetches and sets a specific namespace and access token dynamically.

Functionality is added to handle stacks with a specific path. A PulumiConfig structure is also implemented to read and use the 'runtime' value from `Pulumi.yaml` file.

For a runtime of 'nodejs', npm install is run before pulumi commands. Pulumi commands are now implemented using `spawn` instead of `output`. This allows better handling of streaming output.

Additionally, changes have been introduced to ensure that serde uses camelCase convention while serialising and deserialising.

A new feature 'boot' has been added for kubernetes jobs and base operators. The default feature set of pulumi-operator github libraries has been set to boot.

Overall, these changes handle a full execution cycle of stack by vnodejs runtime in an organised manner using better practices.
  • Loading branch information
jan-br committed Sep 17, 2023
1 parent 1b8384b commit f79a32c
Show file tree
Hide file tree
Showing 13 changed files with 179 additions and 35 deletions.
3 changes: 2 additions & 1 deletion pulumi-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ edition = "2021"

[dependencies]
derivative = "2.2.0"
tokio = { version = "1.29.1", features = ["full"] }
tokio = { version = "1.29.1", features = ["full"] }
log = "0.4.20"
56 changes: 50 additions & 6 deletions pulumi-cli/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use derivative::Derivative;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::process::Output;
use std::process::{ExitStatus, Output};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;

pub struct PulumiCLI {
Expand All @@ -15,9 +16,16 @@ impl PulumiCLI {
}
}

pub async fn up(&self, options: UpOptions) -> Output {
pub async fn login(&self, options: LoginOptions) -> ExitStatus {
let mut command = Command::new("pulumi");
command.current_dir(&self.workdir).arg("up");
command.arg("login").arg(options.url);

self.spawn(command).await
}

pub async fn up(&self, options: UpOptions) -> ExitStatus {
let mut command = Command::new("pulumi");
command.arg("up");

if let Some(config) = &options.config {
command.arg("--config").arg(config);
Expand Down Expand Up @@ -68,10 +76,42 @@ impl PulumiCLI {
command.arg("--show-sames");
}

command.output().await.unwrap()
self.spawn(command).await
}

pub async fn destroy(&self, options: DestroyOptions) -> Output {
pub async fn spawn(&self, mut command: Command) -> ExitStatus {
command.stdout(std::process::Stdio::piped());
command.stderr(std::process::Stdio::piped());
command.current_dir(&self.workdir);

let mut child = command.spawn().unwrap();

let stdout = child.stdout.take().expect("Failed to open stdout");
let stderr = child.stderr.take().expect("Failed to open stderr");

let stdout_reader = BufReader::new(stdout);
let stderr_reader = BufReader::new(stderr);

let stdout_handle = tokio::spawn(async move {
let mut lines = stdout_reader.lines();
while let Some(line) = lines.next_line().await.unwrap() {
log::info!("{}", line);
}
});

let stderr_handle = tokio::spawn(async move {
let mut lines = stderr_reader.lines();
while let Some(line) = lines.next_line().await.unwrap() {
log::error!("{}", line);
}
});

tokio::try_join!(stdout_handle, stderr_handle).unwrap();

child.wait().await.unwrap()
}

pub async fn destroy(&self, options: DestroyOptions) -> ExitStatus {
let mut command = Command::new("pulumi");
command.arg("destroy");

Expand All @@ -85,7 +125,7 @@ impl PulumiCLI {
command.arg("--skip-preview");
}

command.output().await.unwrap()
self.spawn(command).await
}
}

Expand Down Expand Up @@ -116,3 +156,7 @@ pub struct DestroyOptions {
pub yes: bool,
pub skip_preview: bool,
}

pub struct LoginOptions {
pub url: String,
}
4 changes: 4 additions & 0 deletions pulumi-operator-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ springtime-di = "1.0.0"
async-trait = "0.1.71"
thiserror = "1.0.43"
tokio = { version = "1.29.1", features = ["full"] }

[features]
boot = []
default = ["boot"]
3 changes: 2 additions & 1 deletion pulumi-operator-kubernetes-job/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ tokio = { version = "1.29.1", features = ["full"] }
pulumi-cli = { path = "../pulumi-cli" }
thiserror = "1.0.43"
serde = "1.0.171"
serde_yaml = "0.9.25"
schemars = "0.8.12"
serde_json = "1.0.100"
log = "0.4.19"
futures = "0.3.28"
tracing = "0.1.37"
pulumi-operator-kubernetes = { path = "../pulumi-operator-kubernetes", default-features = false}
pulumi-operator-base = { path = "../pulumi-operator-base" }
pulumi-operator-base = { path = "../pulumi-operator-base", default-features = false }
k8s-openapi = { version = "0.19.0", features = ["v1_26"] }
kube = { version = "0.85.0", features = ["runtime", "derive", "admission"] }
git2 = "0.17.2"
45 changes: 29 additions & 16 deletions pulumi-operator-kubernetes-job/src/git/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{collections::BTreeMap, path::{PathBuf, Path}};
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
};

use git2::{Cred, RemoteCallbacks, FetchOptions, build::RepoBuilder};
use git2::{build::RepoBuilder, Cred, FetchOptions, RemoteCallbacks};
use k8s_openapi::{api::core::v1::Secret, ByteString};
use kube::core::ObjectMeta;
use pulumi_operator_base::Inst;
Expand Down Expand Up @@ -73,7 +76,7 @@ impl GitService {
let local = LocalSet::new();

local.spawn_local(async move {
let res = async move {
let res = async move {
let mut callback = RemoteCallbacks::new();

if let Some(auth) = &spec.auth {
Expand All @@ -86,12 +89,14 @@ impl GitService {

let mut builder = RepoBuilder::new();
builder.fetch_options(fo);

builder.clone("", Path::new(""));

Ok::<&str, GitError>("/")
}.await;
tx.send(res).expect("Failed to communicate between threads.");
builder.clone(spec.repository.as_str(), Path::new("./source"))?;

Ok::<&str, GitError>("./source")
}
.await;
tx.send(res)
.expect("Failed to communicate between threads.");
});

rt.block_on(local);
Expand Down Expand Up @@ -131,26 +136,34 @@ impl GitController {
None => "git".into(),
};

Some(match auth.kind {
match auth.kind {
GitAuthType::Ssh => {
let publickey = match data.get("publickey") {
Some(publickey) => Some(String::from_utf8(publickey.clone().0)?),
None => None,
};
// let publickey = match data.get("identity.pub") {
// Some(publickey) => Some(String::from_utf8(publickey.clone().0)?),
// None => None,
// };

//TODO: Somehow github cloning fails with explicit public key?
let publickey: Option<String> = None;

let privatekey = String::from_utf8(
data
.get("privatekey")
.get("identity")
.ok_or_else(|| GitError::DataEmpty)?
.clone()
.0,
)?;

let passphrase = match data.get("passphrase") {
let passphrase = match data.get("identity.pass") {
Some(passphrase) => Some(String::from_utf8(passphrase.clone().0)?),
None => None,
};

//todo: handle known hosts
let known_hosts = data
.get("known_hosts")
.map(|known_hosts| String::from_utf8(known_hosts.clone().0));

callback.credentials(move |_url, username_from_url, _allowed_types| {
Cred::ssh_key_from_memory(
username_from_url.unwrap_or(&fallback_username),
Expand Down Expand Up @@ -178,7 +191,7 @@ impl GitController {
},
);
}
});
}
Ok(())
}
}
2 changes: 0 additions & 2 deletions pulumi-operator-kubernetes-job/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use springtime::application;

#[tokio::main(flavor = "current_thread")]
async fn main() {
#[cfg(feature = "kubernetes")]
pulumi_operator_kubernetes::bind();
application::create_default()
.unwrap()
.run()
Expand Down
82 changes: 81 additions & 1 deletion pulumi-operator-kubernetes-job/src/pulumi_execution.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use futures::task::Spawn;
use k8s_openapi::api::core::v1::Secret;
use pulumi_cli::{LoginOptions, PulumiCLI, UpOptions};
use pulumi_operator_base::Inst;
use pulumi_operator_kubernetes::kubernetes::service::KubernetesService;
use pulumi_operator_kubernetes::stack::auth::inner::InnerStackAuthSpec;
Expand All @@ -8,13 +11,16 @@ use pulumi_operator_kubernetes::stack::crd::{
use pulumi_operator_kubernetes::stack::source::git::repository::GitStackSourceRepository;
use pulumi_operator_kubernetes::stack::source::oci::repository::OciStackSourceRepository;
use pulumi_operator_kubernetes::stack::source::Source;
use serde::Deserialize;
use springtime::runner::ApplicationRunner;
use springtime_di::future::{BoxFuture, FutureExt};
use springtime_di::instance_provider::ErrorPtr;
use springtime_di::{component_alias, Component};
use std::env::VarError;
use std::fs::read_to_string;
use std::sync::Arc;
use thiserror::Error;
use tokio::process::Command;

use crate::fetch_service::{FetchError, FetchService};

Expand All @@ -39,18 +45,92 @@ pub enum PulumiExecutionError {
StackSourceFetchFailed(#[from] FetchError),
}

#[derive(Deserialize)]
pub struct PulumiConfig {
pub runtime: String,
}

impl PulumiExecution {
pub async fn run_internal(&self) -> Result<(), PulumiExecutionError> {
let pulumi_stack = self.get_stack().await?;
let inner_stack_source = self.get_inner_stack_source(&pulumi_stack).await?;
let inner_stack_auth = self.get_inner_stack_auth(&pulumi_stack).await?;

let namespace = std::env::var("WATCH_NAMESPACE")
.map_err(PulumiExecutionError::CurrentNamespaceNotDefined)?;
let access_token = match &inner_stack_auth.access_token_secret {
None => None,
Some(secret_name) => Some(
String::from_utf8(
self
.kubernetes_service
.get_in_namespace::<Secret>(namespace, secret_name)
.await
.unwrap()
.data
.unwrap()
.get("token")
.unwrap()
.0
.clone(),
)
.unwrap(),
),
};

if let Some(access_token) = access_token {
dbg!(&access_token);
std::env::set_var("PULUMI_CONFIG_PASSPHRASE", access_token);
}

let working_dir = self
.fetch_servcice
.fetch(&inner_stack_source, &pulumi_stack.metadata)
.await?;

dbg!("Found inner stack {}", inner_stack_source);
let working_dir = match &pulumi_stack.spec.path {
None => working_dir,
Some(path) => working_dir.join(path),
};

let pulumi_config: PulumiConfig = serde_yaml::from_str(
read_to_string(working_dir.join("Pulumi.yaml"))
.unwrap()
.as_str(),
)
.unwrap();

let pulumi = PulumiCLI::new(working_dir.clone());

match pulumi_config.runtime.as_str() {
"nodejs" => {
pulumi
.spawn({
let mut command = Command::new("npm");
command.arg("install");
command
})
.await;
}
_ => {
unimplemented!()
}
}

pulumi
.login(LoginOptions {
url: "gs://stromee-pulumi".to_string(),
})
.await;

let exit = pulumi
.up(UpOptions {
stack: pulumi_stack.spec.stack_name.clone(),
..Default::default()
})
.await;

dbg!(exit);

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions pulumi-operator-kubernetes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
pulumi-operator-base = { path = "../pulumi-operator-base" }
pulumi-operator-base = { path = "../pulumi-operator-base", default-features = false }
springtime-di = "1.0.0"
springtime = { version = "1.0.0", features = ["tokio", "threadsafe"] }
async-trait = "0.1.71"
Expand All @@ -22,5 +22,5 @@ warp = "0.3.5"

[features]
install-crds = []
boot = []
boot = ["pulumi-operator-base/boot"]
default = ["boot"]
1 change: 1 addition & 0 deletions pulumi-operator-kubernetes/src/kubernetes/crd_installer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl PulumiStackCrdInstaller {
}
}

#[cfg(feature = "boot")]
#[component_alias]
impl ApplicationRunner for PulumiStackCrdInstaller {
fn run(&self) -> BoxFuture<'_, Result<(), ErrorPtr>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ impl KubernetesPulumiStackControllerStrategy {
#[async_trait]
impl PulumiStackControllerStrategy for KubernetesPulumiStackControllerStrategy {
async fn initialize(&self) -> Result<(), PulumiStackControllerStrategyError> {
#[cfg(feature = "boot")]
self.start_controller().await?;
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions pulumi-operator-kubernetes/src/stack/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ use super::status::StackStatus;
version = "v1",
kind = "PulumiStack",
plural = "pulumistacks",
status = "StackStatus",
status = "StackStatus"
)]
#[kube(namespaced)]
#[serde(rename_all = "camelCase")]
pub struct StackSpec {
pub stack_name: Option<String>,
pub source: StackSourceRef,
pub auth: StackAuthRef,
pub path: Option<String>,
}

#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
Expand Down Expand Up @@ -57,4 +58,3 @@ pub enum StackSourceRefType {
#[serde(rename = "ClusterOciStackSource")]
ClusterOci,
}

Loading

0 comments on commit f79a32c

Please sign in to comment.