Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/console/handlers/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::types::v1alpha1::tenant::Tenant;

// curl -X POST http://localhost:9090/api/v1/login \
// -H "Content-Type: application/json" \
// -d "{\"token\": \"$TOKEN\"}"
// -d "{\"token\": \"$TOKEN\"}"
pub async fn login(
State(state): State<AppState>,
Json(req): Json<LoginRequest>,
Expand Down
53 changes: 29 additions & 24 deletions src/console/handlers/pods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@
// limitations under the License.

use axum::{
Extension, Json,
body::Body,
extract::{Path, Query},
response::{IntoResponse, Response},
Extension, Json,
};
use futures::TryStreamExt;
use k8s_openapi::api::core::v1 as corev1;
use kube::{
api::{DeleteParams, ListParams, LogParams},
Api, Client, ResourceExt,
api::{DeleteParams, ListParams, LogParams},
};
use snafu::ResultExt;
use futures::TryStreamExt;

use crate::console::{
error::{self, Error, Result},
Expand All @@ -42,9 +42,7 @@ pub async fn list_pods(

// 查询带有 Tenant 标签的 Pods
let pods = api
.list(
&ListParams::default().labels(&format!("rustfs.tenant={}", tenant_name)),
)
.list(&ListParams::default().labels(&format!("rustfs.tenant={}", tenant_name)))
.await
.context(error::KubeApiSnafu)?;

Expand Down Expand Up @@ -92,7 +90,11 @@ pub async fn list_pods(

// 容器就绪状态
let (ready_count, total_count) = if let Some(status) = status {
let total = status.container_statuses.as_ref().map(|c| c.len()).unwrap_or(0);
let total = status
.container_statuses
.as_ref()
.map(|c| c.len())
.unwrap_or(0);
let ready = status
.container_statuses
.as_ref()
Expand All @@ -106,12 +108,7 @@ pub async fn list_pods(
// 重启次数
let restarts = status
.and_then(|s| s.container_statuses.as_ref())
.map(|containers| {
containers
.iter()
.map(|c| c.restart_count)
.sum::<i32>()
})
.map(|containers| containers.iter().map(|c| c.restart_count).sum::<i32>())
.unwrap_or(0);

// 创建时间和 Age
Expand Down Expand Up @@ -237,7 +234,10 @@ pub async fn get_pod_details(
status: c.status.clone(),
reason: c.reason.clone(),
message: c.message.clone(),
last_transition_time: c.last_transition_time.as_ref().map(|t| t.0.to_rfc3339()),
last_transition_time: c
.last_transition_time
.as_ref()
.map(|t| t.0.to_rfc3339()),
})
.collect()
})
Expand All @@ -250,11 +250,15 @@ pub async fn get_pod_details(
};

// 容器信息
let containers = if let Some(container_statuses) = status_info.and_then(|s| s.container_statuses.as_ref()) {
let containers = if let Some(container_statuses) =
status_info.and_then(|s| s.container_statuses.as_ref())
{
container_statuses
.iter()
.map(|cs| {
let state = if let Some(running) = &cs.state.as_ref().and_then(|s| s.running.as_ref()) {
let state = if let Some(running) =
&cs.state.as_ref().and_then(|s| s.running.as_ref())
{
ContainerState::Running {
started_at: running.started_at.as_ref().map(|t| t.0.to_rfc3339()),
}
Expand All @@ -263,7 +267,9 @@ pub async fn get_pod_details(
reason: waiting.reason.clone(),
message: waiting.message.clone(),
}
} else if let Some(terminated) = &cs.state.as_ref().and_then(|s| s.terminated.as_ref()) {
} else if let Some(terminated) =
&cs.state.as_ref().and_then(|s| s.terminated.as_ref())
{
ContainerState::Terminated {
reason: terminated.reason.clone(),
exit_code: terminated.exit_code,
Expand Down Expand Up @@ -331,10 +337,7 @@ pub async fn get_pod_details(
ip: status_info.and_then(|s| s.pod_ip.clone()),
labels: pod.metadata.labels.unwrap_or_default(),
annotations: pod.metadata.annotations.unwrap_or_default(),
created_at: pod
.metadata
.creation_timestamp
.map(|ts| ts.0.to_rfc3339()),
created_at: pod.metadata.creation_timestamp.map(|ts| ts.0.to_rfc3339()),
}))
}

Expand Down Expand Up @@ -386,9 +389,11 @@ pub async fn get_pod_logs(

/// 创建 Kubernetes 客户端
async fn create_client(claims: &Claims) -> Result<Client> {
let mut config = kube::Config::infer().await.map_err(|e| Error::InternalServer {
message: format!("Failed to load kubeconfig: {}", e),
})?;
let mut config = kube::Config::infer()
.await
.map_err(|e| Error::InternalServer {
message: format!("Failed to load kubeconfig: {}", e),
})?;

config.auth_info.token = Some(claims.k8s_token.clone().into());

Expand Down
82 changes: 37 additions & 45 deletions src/console/handlers/pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use axum::{extract::Path, Extension, Json};
use axum::{Extension, Json, extract::Path};
use k8s_openapi::api::apps::v1 as appsv1;
use k8s_openapi::api::core::v1 as corev1;
use kube::{api::ListParams, Api, Client, ResourceExt};
use kube::{Api, Client, ResourceExt, api::ListParams};
use snafu::ResultExt;

use crate::console::{
Expand Down Expand Up @@ -46,10 +46,7 @@ pub async fn list_pools(
// 获取所有 StatefulSets
let ss_api: Api<appsv1::StatefulSet> = Api::namespaced(client, &namespace);
let statefulsets = ss_api
.list(
&ListParams::default()
.labels(&format!("rustfs.tenant={}", tenant_name)),
)
.list(&ListParams::default().labels(&format!("rustfs.tenant={}", tenant_name)))
.await
.context(error::KubeApiSnafu)?;

Expand All @@ -64,43 +61,37 @@ pub async fn list_pools(
.iter()
.find(|ss| ss.name_any() == ss_name);

let (
replicas,
ready_replicas,
updated_replicas,
current_revision,
update_revision,
state,
) = if let Some(ss) = ss {
let status = ss.status.as_ref();
let replicas = status.map(|s| s.replicas).unwrap_or(0);
let ready = status.and_then(|s| s.ready_replicas).unwrap_or(0);
let updated = status.and_then(|s| s.updated_replicas).unwrap_or(0);
let current_rev = status.and_then(|s| s.current_revision.clone());
let update_rev = status.and_then(|s| s.update_revision.clone());

let state = if ready == replicas && updated == replicas && replicas > 0 {
"Ready"
} else if updated < replicas {
"Updating"
} else if ready < replicas {
"Degraded"
let (replicas, ready_replicas, updated_replicas, current_revision, update_revision, state) =
if let Some(ss) = ss {
let status = ss.status.as_ref();
let replicas = status.map(|s| s.replicas).unwrap_or(0);
let ready = status.and_then(|s| s.ready_replicas).unwrap_or(0);
let updated = status.and_then(|s| s.updated_replicas).unwrap_or(0);
let current_rev = status.and_then(|s| s.current_revision.clone());
let update_rev = status.and_then(|s| s.update_revision.clone());

let state = if ready == replicas && updated == replicas && replicas > 0 {
"Ready"
} else if updated < replicas {
"Updating"
} else if ready < replicas {
"Degraded"
} else {
"NotReady"
};

(
replicas,
ready,
updated,
current_rev,
update_rev,
state.to_string(),
)
} else {
"NotReady"
(0, 0, 0, None, None, "NotCreated".to_string())
};

(
replicas,
ready,
updated,
current_rev,
update_rev,
state.to_string(),
)
} else {
(0, 0, 0, None, None, "NotCreated".to_string())
};

// 获取存储配置
let storage_class = pool
.persistence
Expand Down Expand Up @@ -302,8 +293,7 @@ pub async fn delete_pool(
// 检查是否为最后一个 Pool
if tenant.spec.pools.len() == 1 {
return Err(Error::BadRequest {
message: "Cannot delete the last pool. Delete the entire Tenant instead."
.to_string(),
message: "Cannot delete the last pool. Delete the entire Tenant instead.".to_string(),
});
}

Expand Down Expand Up @@ -338,9 +328,11 @@ pub async fn delete_pool(

/// 创建 Kubernetes 客户端
async fn create_client(claims: &Claims) -> Result<Client> {
let mut config = kube::Config::infer().await.map_err(|e| Error::InternalServer {
message: format!("Failed to load kubeconfig: {}", e),
})?;
let mut config = kube::Config::infer()
.await
.map_err(|e| Error::InternalServer {
message: format!("Failed to load kubeconfig: {}", e),
})?;

config.auth_info.token = Some(claims.k8s_token.clone().into());

Expand Down
6 changes: 3 additions & 3 deletions src/console/handlers/tenants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ pub async fn update_tenant(
"Invalid pod_management_policy '{}', must be 'OrderedReady' or 'Parallel'",
pod_mgmt_policy
),
})
});
}
};
updated_fields.push(format!("pod_management_policy={}", pod_mgmt_policy));
Expand All @@ -396,7 +396,7 @@ pub async fn update_tenant(
"Invalid image_pull_policy '{}', must be 'Always', 'IfNotPresent', or 'Never'",
image_pull_policy
),
})
});
}
};
updated_fields.push(format!("image_pull_policy={}", image_pull_policy));
Expand All @@ -415,7 +415,7 @@ pub async fn update_tenant(
"Invalid logging type '{}', must be 'stdout', 'emptyDir', or 'persistent'",
logging.log_type
),
})
});
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/console/models/pod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct PodListItem {
pub status: String,
pub phase: String,
pub node: Option<String>,
pub ready: String, // e.g., "1/1"
pub ready: String, // e.g., "1/1"
pub restarts: i32,
pub age: String,
pub created_at: Option<String>,
Expand Down
2 changes: 1 addition & 1 deletion src/console/models/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub struct EnvVar {
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct LoggingConfig {
pub log_type: String, // "stdout" | "emptyDir" | "persistent"
pub log_type: String, // "stdout" | "emptyDir" | "persistent"
pub volume_size: Option<String>,
pub storage_class: Option<String>,
}
Expand Down
10 changes: 3 additions & 7 deletions src/console/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// limitations under the License.

use crate::console::{routes, state::AppState};
use axum::http::{header, HeaderValue, Method, StatusCode};
use axum::{middleware, response::IntoResponse, routing::get, Router};
use axum::http::{HeaderValue, Method, StatusCode, header};
use axum::{Router, middleware, response::IntoResponse, routing::get};
use tower_http::{compression::CompressionLayer, cors::CorsLayer, trace::TraceLayer};

/// Build CORS allowed origins from env or default.
Expand All @@ -38,11 +38,7 @@ fn cors_allowed_origins() -> Vec<HeaderValue> {
.filter(|o| !o.is_empty())
.filter_map(|o| o.parse().ok())
.collect();
if parsed.is_empty() {
default
} else {
parsed
}
if parsed.is_empty() { default } else { parsed }
}

/// 启动 Console HTTP Server
Expand Down
4 changes: 3 additions & 1 deletion src/types/v1alpha1/tenant/workloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,9 @@ impl Tenant {

let container = corev1::Container {
name: "rustfs".to_owned(),
image: Some(super::helper::get_rustfs_image_or_default(self.spec.image.as_ref())),
image: Some(super::helper::get_rustfs_image_or_default(
self.spec.image.as_ref(),
)),
env: if env_vars.is_empty() {
None
} else {
Expand Down