Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version checker for Resume #3999

Merged
merged 10 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions crates/fluvio-cluster/src/check/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod render;
use anyhow::Result;
use colored::Colorize;
use fluvio_future::timer::sleep;
use fluvio_types::config_file::SaveLoadConfig;
use indicatif::style::TemplateError;
use tracing::{error, debug};
use async_trait::async_trait;
Expand All @@ -25,6 +26,7 @@ use crate::charts::{DEFAULT_HELM_VERSION, APP_CHART_NAME};
use crate::progress::ProgressBarFactory;
use crate::render::ProgressRenderer;
use crate::charts::{ChartConfig, ChartInstaller, ChartInstallError, SYS_CHART_NAME};
use crate::LocalConfig;

const KUBE_VERSION: &str = "1.7.0";
const RESOURCE_SERVICE: &str = "service";
Expand Down Expand Up @@ -233,6 +235,15 @@ pub enum UnrecoverableCheckStatus {
#[error("Local Fluvio cluster wasn't deleted. Use 'resume' to resume created cluster or 'delete' before starting a new one")]
CreateLocalConfigError,

/// The installed version of the local cluster is incompatible
#[error("Check Versions match failed: cannot resume a {installed} cluster with fluvio version {required}.")]
IncompatibleLocalClusterVersion {
/// The currently-installed version
installed: String,
/// The required version
required: String,
},

#[error("Helm client error")]
HelmClientError,

Expand Down Expand Up @@ -723,6 +734,40 @@ impl ClusterCheck for CleanLocalClusterCheck {
}
}

// Check local cluster is installed with a compatible version
#[derive(Debug)]
struct LocalClusterVersionCheck(Version);

#[async_trait]
impl ClusterCheck for LocalClusterVersionCheck {
async fn perform_check(&self, _pb: &ProgressRenderer) -> CheckResult {
use crate::start::local::LOCAL_CONFIG_PATH;

let installed_version = LOCAL_CONFIG_PATH
.as_ref()
.and_then(|p| LocalConfig::load_from(p).ok())
.map(|conf| conf.platform_version().clone())
.ok_or(anyhow::Error::msg(
"Could not load local config's platform version",
))?;

if installed_version != self.0 {
Ok(CheckStatus::Unrecoverable(
UnrecoverableCheckStatus::IncompatibleLocalClusterVersion {
installed: installed_version.to_string(),
required: self.0.to_string(),
},
))
} else {
Ok(CheckStatus::pass("Platform versions match"))
}
}

fn label(&self) -> &str {
"Versions match"
}
}

/// Manages all cluster check operations
///
/// A `ClusterChecker` can be configured with different sets of checks to run.
Expand Down Expand Up @@ -790,6 +835,10 @@ impl ClusterChecker {
self.with_check(CleanLocalClusterCheck)
}

pub fn with_local_cluster_version(self, version: Version) -> Self {
self.with_check(LocalClusterVersionCheck(version))
}

/// Adds all checks required for starting a cluster on minikube.
///
/// Note that no checks are run until the [`run`] method is invoked.
Expand Down
9 changes: 7 additions & 2 deletions crates/fluvio-cluster/src/cli/resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{InstallationType, cli::get_installation_type};
pub struct ResumeOpt;

impl ResumeOpt {
pub async fn process(self, _platform_version: Version) -> Result<()> {
pub async fn process(self, platform_version: Version) -> Result<()> {
let pb_factory = ProgressBarFactory::new(false);

let pb = match pb_factory.create() {
Expand All @@ -32,7 +32,10 @@ impl ResumeOpt {

let resume_result = match installation_type {
InstallationType::Local | InstallationType::ReadOnly => {
let resume = LocalResume { pb_factory };
let resume = LocalResume {
pb_factory,
platform_version,
};
resume.resume().await
}
_ => {
Expand All @@ -52,6 +55,7 @@ impl ResumeOpt {
#[derive(Debug)]
struct LocalResume {
pb_factory: ProgressBarFactory,
platform_version: Version,
}

impl LocalResume {
Expand Down Expand Up @@ -80,6 +84,7 @@ impl LocalResume {
async fn preflight_check(&self) -> Result<()> {
ClusterChecker::empty()
.without_installed_local_cluster()
.with_local_cluster_version(self.platform_version.clone())
.run(&self.pb_factory, false)
.await?;
Ok(())
Expand Down
106 changes: 67 additions & 39 deletions crates/fluvio-cluster/src/start/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ use fluvio_controlplane_metadata::spu::SpuSpec;
use k8_client::SharedK8Client;
use once_cell::sync::Lazy;
use semver::Version;
use tracing::{debug, error, instrument, warn};
use tracing::{debug, error, info, instrument, warn};

use fluvio::{Fluvio, FluvioConfig};
use fluvio_future::timer::sleep;
use fluvio_future::{
retry::{retry, ExponentialBackoff, RetryExt},
timer::sleep,
};

use crate::render::ProgressRenderer;

Expand All @@ -20,6 +23,18 @@ static MAX_SC_LOOP: Lazy<u8> = Lazy::new(|| {
var_value.parse().unwrap_or(120)
});

#[derive(Debug)]
enum TryConnectError {
Timeout,
#[allow(dead_code)]
sehz marked this conversation as resolved.
Show resolved Hide resolved
UnexpectedVersion {
expected: Version,
current: Version,
},
#[allow(dead_code)]
Unexpected(anyhow::Error),
}

/// try connection to SC
#[instrument]
pub async fn try_connect_to_sc(
Expand All @@ -30,40 +45,36 @@ pub async fn try_connect_to_sc(
async fn try_connect_sc(
fluvio_config: &FluvioConfig,
expected_version: &Version,
) -> Option<Fluvio> {
use tokio::select;

select! {
_ = &mut sleep(Duration::from_secs(10)) => {
debug!("timer expired");
None
},

connection = Fluvio::connect_with_config(fluvio_config) => {

match connection {
Ok(fluvio) => {
let current_version = fluvio.platform_version();
if current_version == expected_version {
debug!("Got updated SC Version{}", &expected_version);
Some(fluvio)
} else {
warn!("Current Version {} is not same as expected: {}",current_version,expected_version);
None
}
}
Err(err) => {
debug!("couldn't connect: {:#?}", err);
None
}
) -> Result<Fluvio, TryConnectError> {
match Fluvio::connect_with_config(fluvio_config).await {
Ok(fluvio) => {
let current_version = fluvio.platform_version();
if current_version == expected_version {
debug!(version = %current_version, "Got updated SC Version");
Ok(fluvio)
} else {
warn!(
"Current Version {} is not same as expected: {}",
current_version, expected_version
);
Err(TryConnectError::UnexpectedVersion {
expected: expected_version.clone(),
current: current_version.clone(),
})
}

}
Err(err) => {
warn!("couldn't connect: {:#?}", err);
Err(TryConnectError::Unexpected(err))
}
}
}

let mut attempt = 0u16;
let time = SystemTime::now();
for attempt in 0..*MAX_SC_LOOP {
let operation = || {
attempt += 1;

debug!(
"Trying to connect to sc at: {}, attempt: {}",
config.endpoint, attempt
Expand All @@ -74,17 +85,34 @@ pub async fn try_connect_to_sc(
config.endpoint,
elapsed.as_secs()
));
if let Some(fluvio) = try_connect_sc(config, platform_version).await {
debug!("Connection to sc succeed!");
return Some(fluvio);
} else if attempt < *MAX_SC_LOOP - 1 {
debug!("Connection failed. sleeping 10 seconds");
sleep(Duration::from_secs(1)).await;
async move {
let retry_timeout = 10;
match try_connect_sc(config, platform_version)
.timeout(Duration::from_secs(retry_timeout))
.await
.unwrap_or(Err(TryConnectError::Timeout))
{
Ok(fluvio) => {
info!("Connection to sc succeed!");
Ok(fluvio)
}
Err(err) => {
warn!("Connection failed with {:?}", err);
Err(err)
}
}
}
}
};

error!("fail to connect to sc at: {}", config.endpoint);
None
retry(
ExponentialBackoff::from_millis(2)
.max_delay(Duration::from_secs(10))
.take(*MAX_SC_LOOP as usize),
operation,
)
.await
.map_err(|_| error!("fail to connect to sc at: {}", config.endpoint))
.ok()
}

// hack
Expand Down
14 changes: 7 additions & 7 deletions crates/fluvio-cluster/src/start/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ impl LocalConfig {
builder
}

pub fn platform_version(&self) -> &Version {
&self.platform_version
}

pub fn launcher_path(&self) -> Option<&Path> {
self.launcher.as_deref()
}
Expand Down Expand Up @@ -524,13 +528,9 @@ impl LocalInstaller {
let cluster_config =
FluvioConfig::new(LOCAL_SC_ADDRESS).with_tls(self.config.client_tls_policy.clone());

if let Some(fluvio) =
try_connect_to_sc(&cluster_config, &self.config.platform_version, pb).await
{
Ok(fluvio)
} else {
Err(LocalInstallError::SCServiceTimeout.into())
}
try_connect_to_sc(&cluster_config, &self.config.platform_version, pb)
.await
.ok_or(LocalInstallError::SCServiceTimeout.into())
}

/// set local profile
Expand Down
Loading