Skip to content

Commit

Permalink
Merge branch 'main' into go/kubeconfig-default
Browse files Browse the repository at this point in the history
  • Loading branch information
clux committed Dec 6, 2022
2 parents 1f51e4e + a4bcf97 commit 8ecbd8f
Show file tree
Hide file tree
Showing 33 changed files with 129 additions and 79 deletions.
2 changes: 1 addition & 1 deletion clippy.toml
@@ -1 +1 @@
blacklisted-names = []
disallowed-names = []
20 changes: 20 additions & 0 deletions deny.toml
Expand Up @@ -83,6 +83,26 @@ multiple-versions = "deny"
name = "idna"
version = "0.2"

# waiting on hyper-rustls and below to bump its chain
[[bans.skip]]
name = "windows_i686_msvc"
version = "0.36"
[[bans.skip]]
name = "windows_aarch64_msvc"
version = "0.36"
[[bans.skip]]
name = "windows-sys"
version = "0.36"
[[bans.skip]]
name = "windows_i686_gnu"
version = "0.36"
[[bans.skip]]
name = "windows_x86_64_msvc"
version = "0.36"
[[bans.skip]]
name = "windows_x86_64_gnu"
version = "0.36"

[[bans.skip]]
# waiting for ahash/getrandom to bump wasi as we have two branches:
# ahash -> getrandom -> wasi old
Expand Down
2 changes: 1 addition & 1 deletion examples/crd_derive.rs
Expand Up @@ -51,7 +51,7 @@ fn main() {
});
println!("Spec: {:?}", foo.spec);
let crd = serde_json::to_string_pretty(&FooCrd::crd()).unwrap();
println!("Foo CRD: \n{}", crd);
println!("Foo CRD: \n{crd}");

println!("Spec (via HasSpec): {:?}", foo.spec());
println!("Status (via HasStatus): {:?}", foo.status());
Expand Down
2 changes: 1 addition & 1 deletion examples/crd_derive_schema.rs
Expand Up @@ -250,7 +250,7 @@ async fn delete_crd(client: Client) -> Result<()> {
return Ok(());
}
}
Err(anyhow!(format!("CRD not deleted after {} seconds", timeout_secs)))
Err(anyhow!(format!("CRD not deleted after {timeout_secs} seconds")))
} else {
Ok(())
}
Expand Down
10 changes: 5 additions & 5 deletions examples/kubectl.rs
Expand Up @@ -125,7 +125,7 @@ impl App {

async fn watch(&self, api: Api<DynamicObject>, mut lp: ListParams) -> Result<()> {
if let Some(n) = &self.name {
lp = lp.fields(&format!("metadata.name={}", n));
lp = lp.fields(&format!("metadata.name={n}"));
}
// present a dumb table for it for now. kubectl does not do this anymore.
let mut stream = watcher(api, lp).applied_objects().boxed();
Expand Down Expand Up @@ -196,7 +196,7 @@ async fn main() -> Result<()> {
if let Some(resource) = &app.resource {
// Common discovery, parameters, and api configuration for a single resource
let (ar, caps) = resolve_api_resource(&discovery, resource)
.with_context(|| format!("resource {:?} not found in cluster", resource))?;
.with_context(|| format!("resource {resource:?} not found in cluster"))?;
let mut lp = ListParams::default();
if let Some(label) = &app.selector {
lp = lp.labels(label);
Expand Down Expand Up @@ -238,9 +238,9 @@ fn format_creation_since(time: Option<Time>) -> String {
}
fn format_duration(dur: Duration) -> String {
match (dur.num_days(), dur.num_hours(), dur.num_minutes()) {
(days, _, _) if days > 0 => format!("{}d", days),
(_, hours, _) if hours > 0 => format!("{}h", hours),
(_, _, mins) => format!("{}m", mins),
(days, _, _) if days > 0 => format!("{days}d"),
(_, hours, _) if hours > 0 => format!("{hours}h"),
(_, _, mins) => format!("{mins}m"),
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/node_watcher.rs
Expand Up @@ -48,7 +48,7 @@ async fn check_for_node_failures(events: &Api<Event>, o: Node) -> anyhow::Result
warn!("Unschedulable Node: {}, ({:?})", name, failed);
// Find events related to this node
let opts =
ListParams::default().fields(&format!("involvedObject.kind=Node,involvedObject.name={}", name));
ListParams::default().fields(&format!("involvedObject.kind=Node,involvedObject.name={name}"));
let evlist = events.list(&opts).await?;
for e in evlist {
warn!("Node event: {:?}", serde_json::to_string_pretty(&e)?);
Expand Down
2 changes: 1 addition & 1 deletion examples/pod_cp.rs
Expand Up @@ -77,7 +77,7 @@ async fn main() -> anyhow::Result<()> {
{
let ap = AttachParams::default().stderr(false);
let mut cat = pods
.exec("example", vec!["cat", &format!("/{}", file_name)], &ap)
.exec("example", vec!["cat", &format!("/{file_name}")], &ap)
.await?;
let mut cat_out = tokio_util::io::ReaderStream::new(cat.stdout().unwrap());
let next_stdout = cat_out.next().await.unwrap()?;
Expand Down
8 changes: 4 additions & 4 deletions examples/pod_exec.rs
Expand Up @@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
let output = get_output(attached).await;
println!("{}", output);
println!("{output}");
assert_eq!(output.lines().count(), 3);
}

Expand All @@ -71,7 +71,7 @@ async fn main() -> anyhow::Result<()> {
.exec("example", vec!["uptime"], &AttachParams::default().stderr(false))
.await?;
let output = get_output(attached).await;
println!("{}", output);
println!("{output}");
assert_eq!(output.lines().count(), 1);
}

Expand All @@ -89,15 +89,15 @@ async fn main() -> anyhow::Result<()> {
let next_stdout = stdout_stream.next();
stdin_writer.write_all(b"echo test string 1\n").await?;
let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
println!("{}", stdout);
println!("{stdout}");
assert_eq!(stdout, "test string 1\n");

// AttachedProcess provides access to a future that resolves with a status object.
let status = attached.take_status().unwrap();
// Send `exit 1` to get a failure status.
stdin_writer.write_all(b"exit 1\n").await?;
if let Some(status) = status.await {
println!("{:?}", status);
println!("{status:?}");
assert_eq!(status.status, Some("Failure".to_owned()));
assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
}
Expand Down
5 changes: 2 additions & 3 deletions examples/pod_shell_crossterm.rs
@@ -1,7 +1,6 @@
use futures::{channel::mpsc::Sender, SinkExt, StreamExt};
use k8s_openapi::api::core::v1::Pod;

#[cfg(unix)] use crossterm::event::Event;
use kube::{
api::{Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, TerminalSize},
runtime::wait::{await_condition, conditions::is_pod_running},
Expand All @@ -19,7 +18,7 @@ async fn handle_terminal_size(mut channel: Sender<TerminalSize>) -> Result<(), a
// create a stream to catch SIGWINCH signal
let mut sig = signal::unix::signal(signal::unix::SignalKind::window_change())?;
loop {
if sig.recv().await == None {
if (sig.recv().await).is_none() {
return Ok(());
}

Expand Down Expand Up @@ -127,6 +126,6 @@ async fn main() -> anyhow::Result<()> {
assert_eq!(pdel.name_any(), "example");
});

println!("");
println!();
Ok(())
}
2 changes: 1 addition & 1 deletion examples/secret_syncer.rs
Expand Up @@ -32,7 +32,7 @@ type Result<T, E = Error> = std::result::Result<T, E>;

fn secret_name_for_configmap(cm: &ConfigMap) -> Result<String> {
let name = cm.metadata.name.as_deref().ok_or(Error::NoName)?;
Ok(format!("cmsyncer-{}", name))
Ok(format!("cmsyncer-{name}"))
}

async fn apply(cm: Arc<ConfigMap>, secrets: &kube::Api<Secret>) -> Result<Action> {
Expand Down
4 changes: 2 additions & 2 deletions kube-client/Cargo.toml
Expand Up @@ -37,7 +37,7 @@ rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
base64 = { version = "0.13.0", optional = true }
chrono = { version = "0.4.19", optional = true, default-features = false }
chrono = { version = "0.4.23", optional = true, default-features = false }
dirs = { package = "dirs-next", optional = true, version = "2.0.0" }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.68"
Expand All @@ -58,7 +58,7 @@ jsonpath_lib = { version = "0.3.0", optional = true }
tokio-util = { version = "0.7.0", optional = true, features = ["io", "codec"] }
hyper = { version = "0.14.13", optional = true, features = ["client", "http1", "stream", "tcp"] }
hyper-rustls = { version = "0.23.0", optional = true }
tokio-tungstenite = { version = "0.17.1", optional = true }
tokio-tungstenite = { version = "0.18.0", optional = true }
tower = { version = "0.4.6", optional = true, features = ["buffer", "filter", "util"] }
tower-http = { version = "0.3.2", optional = true, features = ["auth", "map-response-body", "trace"] }
hyper-timeout = {version = "0.4.1", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions kube-client/src/api/entry.rs
Expand Up @@ -429,11 +429,11 @@ mod tests {

let mut entry = match api.entry(object_name).await? {
Entry::Occupied(entry) => entry,
entry => panic!("entry for existing object must be occupied: {:?}", entry),
entry => panic!("entry for existing object must be occupied: {entry:?}"),
};
let mut entry2 = match api.entry(object_name).await? {
Entry::Occupied(entry) => entry,
entry => panic!("entry for existing object must be occupied: {:?}", entry),
entry => panic!("entry for existing object must be occupied: {entry:?}"),
};

// Entry is up-to-date, modify cleanly
Expand Down
3 changes: 1 addition & 2 deletions kube-client/src/api/util/mod.rs
Expand Up @@ -161,8 +161,7 @@ mod test {
assert_eq!(
tokenreviewstatus.user.unwrap().username,
Some(format!(
"system:serviceaccount:{}:{}",
serviceaccount_namespace, serviceaccount_name
"system:serviceaccount:{serviceaccount_namespace}:{serviceaccount_name}"
))
);

Expand Down
44 changes: 29 additions & 15 deletions kube-client/src/client/auth/mod.rs
Expand Up @@ -94,6 +94,7 @@ pub(crate) enum Auth {
Basic(String, SecretString),
Bearer(SecretString),
RefreshableToken(RefreshableToken),
Certificate(String, SecretString),
}

// Token file reference. Reloads at least once per minute.
Expand Down Expand Up @@ -188,7 +189,7 @@ impl RefreshableToken {
if Utc::now() + Duration::seconds(60) >= locked_data.1 {
// TODO Improve refreshing exec to avoid `Auth::try_from`
match Auth::try_from(&locked_data.2)? {
Auth::None | Auth::Basic(_, _) | Auth::Bearer(_) => {
Auth::None | Auth::Basic(_, _) | Auth::Bearer(_) | Auth::Certificate(_, _) => {
return Err(Error::UnrefreshableTokenResponse);
}

Expand Down Expand Up @@ -234,7 +235,7 @@ impl RefreshableToken {
}

fn bearer_header(token: &str) -> Result<HeaderValue, Error> {
let mut value = HeaderValue::try_from(format!("Bearer {}", token)).map_err(Error::InvalidBearerToken)?;
let mut value = HeaderValue::try_from(format!("Bearer {token}")).map_err(Error::InvalidBearerToken)?;
value.set_sensitive(true);
Ok(value)
}
Expand Down Expand Up @@ -295,6 +296,11 @@ impl TryFrom<&AuthInfo> for Auth {
if let Some(exec) = &auth_info.exec {
let creds = auth_exec(exec)?;
let status = creds.status.ok_or(Error::ExecPluginFailed)?;
if let (Some(client_certificate_data), Some(client_key_data)) =
(status.client_certificate_data, status.client_key_data)
{
return Ok(Self::Certificate(client_certificate_data, client_key_data.into()));
}
let expiration = status
.expiration_timestamp
.map(|ts| ts.parse())
Expand Down Expand Up @@ -328,6 +334,9 @@ fn token_from_provider(provider: &AuthProviderConfig) -> Result<ProviderToken, E
match provider.name.as_ref() {
"oidc" => token_from_oidc_provider(provider),
"gcp" => token_from_gcp_provider(provider),
"azure" => Err(Error::AuthExec(
"The azure auth plugin is not supported; use https://github.com/Azure/kubelogin instead".into(),
)),
_ => Err(Error::AuthExec(format!(
"Authentication with provider {:} not supported",
provider.name
Expand Down Expand Up @@ -364,16 +373,23 @@ fn token_from_gcp_provider(provider: &AuthProviderConfig) -> Result<ProviderToke
// Command-based token source
if let Some(cmd) = provider.config.get("cmd-path") {
let params = provider.config.get("cmd-args").cloned().unwrap_or_default();

// NB: This property does currently not exist upstream in client-go
// See https://github.com/kube-rs/kube/issues/1060
let drop_env = provider.config.get("cmd-drop-env").cloned().unwrap_or_default();
// TODO splitting args by space is not safe
let output = Command::new(cmd)
let mut command = Command::new(cmd);
// Do not pass the following env vars to the command
for env in drop_env.trim().split(' ') {
command.env_remove(env);
}
let output = command
.args(params.trim().split(' '))
.output()
.map_err(|e| Error::AuthExec(format!("Executing {:} failed: {:?}", cmd, e)))?;
.map_err(|e| Error::AuthExec(format!("Executing {cmd:} failed: {e:?}")))?;

if !output.status.success() {
return Err(Error::AuthExecRun {
cmd: format!("{} {}", cmd, params),
cmd: format!("{cmd} {params}"),
status: output.status,
out: output,
});
Expand All @@ -394,7 +410,7 @@ fn token_from_gcp_provider(provider: &AuthProviderConfig) -> Result<ProviderToke
}
} else {
let token = std::str::from_utf8(&output.stdout)
.map_err(|e| Error::AuthExec(format!("Result is not a string {:?} ", e)))?
.map_err(|e| Error::AuthExec(format!("Result is not a string {e:?} ")))?
.to_owned();
return Ok(ProviderToken::GcpCommand(token, None));
}
Expand All @@ -418,21 +434,20 @@ fn token_from_gcp_provider(provider: &AuthProviderConfig) -> Result<ProviderToke

fn extract_value(json: &serde_json::Value, path: &str) -> Result<String, Error> {
let pure_path = path.trim_matches(|c| c == '"' || c == '{' || c == '}');
match jsonpath_select(json, &format!("${}", pure_path)) {
match jsonpath_select(json, &format!("${pure_path}")) {
Ok(v) if !v.is_empty() => {
if let serde_json::Value::String(res) = v[0] {
Ok(res.clone())
} else {
Err(Error::AuthExec(format!(
"Target value at {:} is not a string",
pure_path
"Target value at {pure_path:} is not a string"
)))
}
}

Err(e) => Err(Error::AuthExec(format!("Could not extract JSON value: {:}", e))),
Err(e) => Err(Error::AuthExec(format!("Could not extract JSON value: {e:}"))),

_ => Err(Error::AuthExec(format!("Target value {:} not found", pure_path))),
_ => Err(Error::AuthExec(format!("Target value {pure_path:} not found"))),
}
}

Expand Down Expand Up @@ -492,7 +507,7 @@ fn auth_exec(auth: &ExecConfig) -> Result<ExecCredential, Error> {
let out = cmd.output().map_err(Error::AuthExecStart)?;
if !out.status.success() {
return Err(Error::AuthExecRun {
cmd: format!("{:?}", cmd),
cmd: format!("{cmd:?}"),
status: out.status,
out,
});
Expand Down Expand Up @@ -536,8 +551,7 @@ mod test {
expiry-key: '{{.credential.token_expiry}}'
token-key: '{{.credential.access_token}}'
name: gcp
"#,
expiry = expiry
"#
);

let config: Kubeconfig = serde_yaml::from_str(&test_file).unwrap();
Expand Down
28 changes: 26 additions & 2 deletions kube-client/src/client/config_ext.rs
Expand Up @@ -144,6 +144,7 @@ impl ConfigExt for Config {
Auth::RefreshableToken(refreshable) => {
Some(AuthLayer(Either::B(AsyncFilterLayer::new(refreshable))))
}
Auth::Certificate(_client_certificate_data, _client_key_data) => None,
})
}

Expand Down Expand Up @@ -174,8 +175,9 @@ impl ConfigExt for Config {

#[cfg(feature = "rustls-tls")]
fn rustls_client_config(&self) -> Result<rustls::ClientConfig> {
let identity = self.exec_identity_pem().or_else(|| self.identity_pem());
tls::rustls_tls::rustls_client_config(
self.identity_pem().as_deref(),
identity.as_deref(),
self.root_cert.as_deref(),
self.accept_invalid_certs,
)
Expand All @@ -192,7 +194,8 @@ impl ConfigExt for Config {

#[cfg(feature = "openssl-tls")]
fn openssl_ssl_connector_builder(&self) -> Result<openssl::ssl::SslConnectorBuilder> {
tls::openssl_tls::ssl_connector_builder(self.identity_pem().as_ref(), self.root_cert.as_ref())
let identity = self.exec_identity_pem().or_else(|| self.identity_pem());
tls::openssl_tls::ssl_connector_builder(identity.as_ref(), self.root_cert.as_ref())
.map_err(|e| Error::OpensslTls(tls::openssl_tls::Error::CreateSslConnector(e)))
}

Expand Down Expand Up @@ -220,3 +223,24 @@ impl ConfigExt for Config {
Ok(https)
}
}

impl Config {
// This is necessary to retrieve an identity when an exec plugin
// returns a client certificate and key instead of a token.
// This has be to be checked on TLS configuration vs tokens
// which can be added in as an AuthLayer.
fn exec_identity_pem(&self) -> Option<Vec<u8>> {
match Auth::try_from(&self.auth_info) {
Ok(Auth::Certificate(client_certificate_data, client_key_data)) => {
const NEW_LINE: u8 = b'\n';

let mut buffer = client_key_data.expose_secret().as_bytes().to_vec();
buffer.push(NEW_LINE);
buffer.extend_from_slice(client_certificate_data.as_bytes());
buffer.push(NEW_LINE);
Some(buffer)
}
_ => None,
}
}
}

0 comments on commit 8ecbd8f

Please sign in to comment.