diff --git a/console-web/app/(dashboard)/tenants/[namespace]/[name]/tenant-detail-client.tsx b/console-web/app/(dashboard)/tenants/[namespace]/[name]/tenant-detail-client.tsx index 0f3e51b..e55a3fd 100644 --- a/console-web/app/(dashboard)/tenants/[namespace]/[name]/tenant-detail-client.tsx +++ b/console-web/app/(dashboard)/tenants/[namespace]/[name]/tenant-detail-client.tsx @@ -33,6 +33,7 @@ import type { AddPoolRequest, EncryptionInfoResponse, UpdateEncryptionRequest, + ProvisioningItemStatus, } from "@/types/api" import { ApiError } from "@/lib/api-client" @@ -65,7 +66,7 @@ function normalizeTab(value?: string | null): Tab { } } -function formatTimestamp(value: string | null): string { +function formatTimestamp(value: string | null | undefined): string { if (!value) return "-" const date = new Date(value) if (Number.isNaN(date.getTime())) return value @@ -95,6 +96,25 @@ function podLastExitSummary(pod: PodListItem, exitedLabel: string): string { return message ? `${summary}: ${message}` : summary } +function provisioningGroups(tenant: TenantDetailsResponse) { + const provisioning = tenant.provisioning + if (!provisioning) return [] + return [ + { type: "Policy", items: provisioning.policies ?? [] }, + { type: "User", items: provisioning.users ?? [] }, + { type: "Bucket", items: provisioning.buckets ?? [] }, + ].filter((group) => group.items.length > 0) +} + +function provisioningItemDetails(item: ProvisioningItemStatus): string { + const details: string[] = [] + if (item.policies && item.policies.length > 0) details.push(`policies=${item.policies.join(",")}`) + if (item.region) details.push(`region=${item.region}`) + if (item.objectLock != null) details.push(`objectLock=${item.objectLock ? "true" : "false"}`) + if (item.lastAppliedGeneration != null) details.push(`generation=${item.lastAppliedGeneration}`) + return details.length > 0 ? details.join(" ") : "-" +} + export function TenantDetailClient({ namespace, name, initialTab, initialYamlEditable }: TenantDetailClientProps) { const router = useRouter() const { t } = useTranslation() @@ -521,6 +541,8 @@ export function TenantDetailClient({ namespace, name, initialTab, initialYamlEdi const statusReason = statusSummary.primary_reason || tenant.state const statusMessage = statusSummary.primary_message || "-" const statusNextActions = statusSummary.next_actions.length > 0 ? statusSummary.next_actions : tenant.next_actions + const provisioning = tenant.provisioning + const provisioningStatusGroups = provisioningGroups(tenant) return ( @@ -606,6 +628,53 @@ export function TenantDetailClient({ namespace, name, initialTab, initialYamlEdi

+ {provisioningStatusGroups.length > 0 && ( + + + {t("Provisioning")} + + {t("Phase")}: {provisioning?.phase ?? "-"} + {provisioning?.observedGeneration != null + ? ` · ${t("Observed generation")}: ${provisioning.observedGeneration}` + : ""} + + + + + + + {t("Type")} + {t("Name")} + {t("Status")} + {t("Reason")} + {t("Details")} + {t("Message")} + {t("Last transition")} + + + + {provisioningStatusGroups.flatMap((group) => + group.items.map((item) => ( + + {t(group.type)} + {item.name} + {item.state} + {item.reason || "-"} + + {provisioningItemDetails(item)} + + + {item.message || "-"} + + {formatTimestamp(item.lastTransitionTime)} + + )), + )} + +
+
+
+ )} {tenant.services.length > 0 && ( diff --git a/console-web/app/(dashboard)/tenants/new/page.tsx b/console-web/app/(dashboard)/tenants/new/page.tsx index 3940e3f..e5da08f 100644 --- a/console-web/app/(dashboard)/tenants/new/page.tsx +++ b/console-web/app/(dashboard)/tenants/new/page.tsx @@ -16,7 +16,13 @@ import { Label } from "@/components/ui/label" import { Spinner } from "@/components/ui/spinner" import { routes } from "@/lib/routes" import * as api from "@/lib/api" -import type { CreatePoolRequest, CreateTenantRequest } from "@/types/api" +import type { + CreatePoolRequest, + CreateTenantRequest, + ProvisioningBucket, + ProvisioningPolicy, + ProvisioningUser, +} from "@/types/api" import { ApiError } from "@/lib/api-client" type CreateMode = "form" | "yaml" @@ -29,19 +35,26 @@ const defaultPool: CreatePoolRequest = { storage_class: "", } -const defaultTenantYaml = `apiVersion: rustfs.io/v1alpha1 +const defaultTenantYaml = `apiVersion: rustfs.com/v1alpha1 kind: Tenant metadata: name: my-tenant namespace: default spec: image: rustfs/rustfs:latest - credsSecret: rustfs-creds + credsSecret: + name: rustfs-creds pools: - name: pool-0 servers: 2 - volumesPerServer: 2 - storageSize: 10Gi + persistence: + volumesPerServer: 2 + volumeClaimTemplate: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi ` function asRecord(value: unknown): Record | null { @@ -64,6 +77,17 @@ function asPositiveInt(value: unknown): number | undefined { return undefined } +function asBoolean(value: unknown): boolean | undefined { + if (typeof value === "boolean") return value + return undefined +} + +function asStringArray(value: unknown): string[] | undefined { + if (!Array.isArray(value)) return undefined + const values = value.map(asString) + return values.every((item): item is string => !!item) ? values : undefined +} + export default function TenantCreatePage() { const { t } = useTranslation() const router = useRouter() @@ -119,9 +143,15 @@ export default function TenantCreatePage() { const metadata = asRecord(root.metadata) const spec = asRecord(root.spec) + const apiVersion = asString(root.apiVersion) + const kind = asString(root.kind) const parsedName = asString(metadata?.name) const parsedNamespace = asString(metadata?.namespace) + if (apiVersion !== "rustfs.com/v1alpha1" || kind !== "Tenant") { + throw new Error(t("YAML must be a rustfs.com/v1alpha1 Tenant")) + } + if (!parsedName || !parsedNamespace) { throw new Error(t("YAML must include metadata.name and metadata.namespace")) } @@ -133,9 +163,18 @@ export default function TenantCreatePage() { const parsedPools: CreatePoolRequest[] = poolsRaw.map((poolItem, index) => { const pool = asRecord(poolItem) + const persistence = asRecord(pool?.persistence) + const volumeClaimTemplate = asRecord(persistence?.volumeClaimTemplate ?? persistence?.volume_claim_template) + const resources = asRecord(volumeClaimTemplate?.resources) + const requests = asRecord(resources?.requests) const servers = asPositiveInt(pool?.servers) - const volumesPerServer = asPositiveInt(pool?.volumesPerServer ?? pool?.volumes_per_server) - const storageSize = asString(pool?.storageSize ?? pool?.storage_size ?? pool?.size) + const volumesPerServer = asPositiveInt( + pool?.volumesPerServer ?? + pool?.volumes_per_server ?? + persistence?.volumesPerServer ?? + persistence?.volumes_per_server, + ) + const storageSize = asString(pool?.storageSize ?? pool?.storage_size ?? pool?.size ?? requests?.storage) if (!pool || !servers || !volumesPerServer || !storageSize) { throw new Error(t("YAML pool fields are invalid")) @@ -146,11 +185,18 @@ export default function TenantCreatePage() { servers, volumes_per_server: volumesPerServer, storage_size: storageSize, - storage_class: asString(pool.storageClass ?? pool.storage_class) || undefined, + storage_class: + asString( + pool.storageClass ?? + pool.storage_class ?? + volumeClaimTemplate?.storageClassName ?? + volumeClaimTemplate?.storage_class_name, + ) || undefined, } }) const specSc = asRecord(spec?.securityContext ?? spec?.security_context) + const credsSecretRef = asRecord(spec?.credsSecret ?? spec?.creds_secret) const security_context = specSc ? { runAsUser: asPositiveInt(specSc.runAsUser ?? specSc.run_as_user), @@ -165,13 +211,72 @@ export default function TenantCreatePage() { } : undefined + const policiesRaw = spec?.policies + const policies = Array.isArray(policiesRaw) + ? policiesRaw.map((item): ProvisioningPolicy => { + const policy = asRecord(item) + const document = asRecord(policy?.document) + const configMapKeyRef = asRecord(document?.configMapKeyRef ?? document?.config_map_key_ref) + const policyName = asString(policy?.name) + const configMapName = asString(configMapKeyRef?.name) + const key = asString(configMapKeyRef?.key) + if (!policy || !policyName || !configMapName || !key) { + throw new Error(t("YAML policy provisioning fields are invalid")) + } + return { + name: policyName, + document: { + configMapKeyRef: { + name: configMapName, + key, + }, + }, + } + }) + : undefined + + const usersRaw = spec?.users + const users = Array.isArray(usersRaw) + ? usersRaw.map((item): ProvisioningUser => { + const user = asRecord(item) + const userName = asString(user?.name) + const userPolicies = asStringArray(user?.policies) + if (!user || !userName || !userPolicies || userPolicies.length === 0) { + throw new Error(t("YAML user provisioning fields are invalid")) + } + return { + name: userName, + policies: userPolicies, + } + }) + : undefined + + const bucketsRaw = spec?.buckets + const buckets = Array.isArray(bucketsRaw) + ? bucketsRaw.map((item): ProvisioningBucket => { + const bucket = asRecord(item) + const bucketName = asString(bucket?.name) + if (!bucket || !bucketName) { + throw new Error(t("YAML bucket provisioning fields are invalid")) + } + return { + name: bucketName, + region: asString(bucket.region), + objectLock: asBoolean(bucket.objectLock ?? bucket.object_lock), + } + }) + : undefined + return { name: parsedName, namespace: parsedNamespace, pools: parsedPools, image: asString(spec?.image), mount_path: asString(spec?.mountPath ?? spec?.mount_path), - creds_secret: asString(spec?.credsSecret ?? spec?.creds_secret), + creds_secret: asString(spec?.credsSecret ?? spec?.creds_secret) ?? asString(credsSecretRef?.name), + policies, + users, + buckets, security_context, } } diff --git a/console-web/i18n/locales/en-US.json b/console-web/i18n/locales/en-US.json index b8ac98f..f7794cf 100755 --- a/console-web/i18n/locales/en-US.json +++ b/console-web/i18n/locales/en-US.json @@ -137,9 +137,13 @@ "Paste tenant YAML and create directly.": "Paste tenant YAML and create directly.", "YAML Content": "YAML Content", "YAML format is invalid": "YAML format is invalid", + "YAML must be a rustfs.com/v1alpha1 Tenant": "YAML must be a rustfs.com/v1alpha1 Tenant", "YAML must include metadata.name and metadata.namespace": "YAML must include metadata.name and metadata.namespace", "YAML must include spec.pools with at least one item": "YAML must include spec.pools with at least one item", "YAML pool fields are invalid": "YAML pool fields are invalid", + "YAML policy provisioning fields are invalid": "YAML policy provisioning fields are invalid", + "YAML user provisioning fields are invalid": "YAML user provisioning fields are invalid", + "YAML bucket provisioning fields are invalid": "YAML bucket provisioning fields are invalid", "Tenant name is required": "Tenant name is required", "Namespace is required": "Namespace is required", "Delete tenant \"{{name}}\"? This cannot be undone.": "Delete tenant \"{{name}}\"? This cannot be undone.", diff --git a/console-web/i18n/locales/zh-CN.json b/console-web/i18n/locales/zh-CN.json index fa5bff9..038c74b 100755 --- a/console-web/i18n/locales/zh-CN.json +++ b/console-web/i18n/locales/zh-CN.json @@ -137,9 +137,13 @@ "Paste tenant YAML and create directly.": "粘贴租户 YAML 后可直接创建。", "YAML Content": "YAML 内容", "YAML format is invalid": "YAML 格式无效", + "YAML must be a rustfs.com/v1alpha1 Tenant": "YAML 必须是 rustfs.com/v1alpha1 Tenant", "YAML must include metadata.name and metadata.namespace": "YAML 必须包含 metadata.name 和 metadata.namespace", "YAML must include spec.pools with at least one item": "YAML 必须包含至少一个 spec.pools 项", "YAML pool fields are invalid": "YAML 中 pool 字段不合法", + "YAML policy provisioning fields are invalid": "YAML 中 policy provisioning 字段不合法", + "YAML user provisioning fields are invalid": "YAML 中 user provisioning 字段不合法", + "YAML bucket provisioning fields are invalid": "YAML 中 bucket provisioning 字段不合法", "Tenant name is required": "请输入租户名称", "Namespace is required": "请输入命名空间", "Delete tenant \"{{name}}\"? This cannot be undone.": "确定删除租户「{{name}}」?此操作不可恢复。", diff --git a/console-web/lib/api.ts b/console-web/lib/api.ts index 5c686e1..36a690d 100644 --- a/console-web/lib/api.ts +++ b/console-web/lib/api.ts @@ -82,6 +82,9 @@ export async function updateTenant( if (body.env !== undefined) payload.env = body.env if (body.pod_management_policy !== undefined) payload.podManagementPolicy = body.pod_management_policy if (body.image_pull_policy !== undefined) payload.imagePullPolicy = body.image_pull_policy + if (body.policies !== undefined) payload.policies = body.policies + if (body.users !== undefined) payload.users = body.users + if (body.buckets !== undefined) payload.buckets = body.buckets if (body.logging !== undefined) payload.logging = body.logging return apiClient.put(`${tenant(namespace, name)}`, Object.keys(payload).length ? payload : undefined) } diff --git a/console-web/types/api.ts b/console-web/types/api.ts index 104d998..4f374d2 100644 --- a/console-web/types/api.ts +++ b/console-web/types/api.ts @@ -89,12 +89,66 @@ export interface TenantDetailsResponse { status_summary: TenantStatusSummary conditions: TenantCondition[] next_actions: string[] + provisioning?: ProvisioningStatus image: string | null mount_path: string | null created_at: string | null services: ServiceInfo[] } +export type ProvisioningDeletionPolicy = "Retain" + +export interface ConfigMapKeyReference { + name: string + key: string +} + +export interface PolicyDocumentSource { + configMapKeyRef: ConfigMapKeyReference +} + +export interface ProvisioningPolicy { + name: string + document: PolicyDocumentSource + deletionPolicy?: ProvisioningDeletionPolicy +} + +export interface ProvisioningUser { + name: string + policies: string[] + deletionPolicy?: ProvisioningDeletionPolicy +} + +export interface ProvisioningBucket { + name: string + region?: string + objectLock?: boolean + deletionPolicy?: ProvisioningDeletionPolicy +} + +export interface ProvisioningItemStatus { + name: string + state: string + reason: string + message?: string | null + lastTransitionTime?: string | null + desiredHash?: string | null + lastAppliedHash?: string | null + lastAppliedGeneration?: number | null + observedSecretResourceVersion?: string | null + policies?: string[] + region?: string | null + objectLock?: boolean | null +} + +export interface ProvisioningStatus { + observedGeneration?: number | null + phase?: "Pending" | "Ready" | "Failed" + policies?: ProvisioningItemStatus[] + users?: ProvisioningItemStatus[] + buckets?: ProvisioningItemStatus[] +} + export interface CreatePoolRequest { name: string servers: number @@ -117,6 +171,9 @@ export interface CreateTenantRequest { image?: string mount_path?: string creds_secret?: string + policies?: ProvisioningPolicy[] + users?: ProvisioningUser[] + buckets?: ProvisioningBucket[] security_context?: CreateSecurityContextRequest } @@ -127,6 +184,9 @@ export interface UpdateTenantRequest { creds_secret?: string pod_management_policy?: string image_pull_policy?: string + policies?: ProvisioningPolicy[] + users?: ProvisioningUser[] + buckets?: ProvisioningBucket[] logging?: { logType: string volumeSize?: string diff --git a/deploy/rustfs-operator/README.md b/deploy/rustfs-operator/README.md index 6a0a46f..e11c639 100755 --- a/deploy/rustfs-operator/README.md +++ b/deploy/rustfs-operator/README.md @@ -77,6 +77,33 @@ STS only issues credentials for TLS-enabled Tenants. For Tenant upstream calls, Operator STS does not present a client certificate when calling the Tenant. Tenants configured with `spec.tls.certManager.caTrust.clientCaSecretRef` continue to run with server-side mTLS enabled, but Operator STS rejects those Tenants with HTTP 400 and `TenantTlsClientCertificateUnsupported`. +### Tenant Provisioning + +Tenants can declare RustFS canned policies, regular users, and buckets directly in `spec.policies`, `spec.users`, and `spec.buckets`. Provisioning starts only after the Tenant workload is ready, uses `spec.credsSecret` as the RustFS admin credential source, and reports progress under `status.provisioning`. + +User provisioning requires a non-empty direct policy mapping: + +```yaml +spec: + credsSecret: + name: rustfs-admin-creds + policies: + - name: app-readwrite + document: + configMapKeyRef: + name: app-policy + key: policy.json + users: + - name: app-user + policies: + - app-readwrite + buckets: + - name: app-data + objectLock: true +``` + +Policy ConfigMaps and user Secrets must live in the Tenant namespace. If they are created outside the Operator Console, add `rustfs.tenant=` so changes to those resources enqueue the owning Tenant. Provisioned resources are retained when removed from the Tenant spec. + ### RBAC Configuration | Parameter | Description | Default | diff --git a/deploy/rustfs-operator/crds/tenant-crd.yaml b/deploy/rustfs-operator/crds/tenant-crd.yaml index 967db03..b7c6ec8 100644 --- a/deploy/rustfs-operator/crds/tenant-crd.yaml +++ b/deploy/rustfs-operator/crds/tenant-crd.yaml @@ -27,6 +27,32 @@ spec: properties: spec: properties: + buckets: + description: Buckets that should exist in the RustFS tenant. + items: + properties: + deletionPolicy: + enum: + - Retain + type: string + name: + type: string + x-kubernetes-validations: + - message: bucket name must be a valid RustFS/S3 bucket name + rule: self.size() >= 3 && self.size() <= 63 && self != 'rustfs' && !self.matches('^(\\d+\\.){3}\\d+$') && !self.contains('..') && !self.contains('.-') && !self.contains('-.') && self.matches('^[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]$') + objectLock: + nullable: true + type: boolean + region: + nullable: true + type: string + required: + - name + type: object + type: array + x-kubernetes-validations: + - message: bucket names must be unique + rule: self.all(x, self.filter(y, y.name == x.name).size() == 1) createServiceAccountRbac: nullable: true type: boolean @@ -422,6 +448,48 @@ spec: - null nullable: true type: string + policies: + description: Canned policies that should be applied to the RustFS tenant. + items: + properties: + deletionPolicy: + enum: + - Retain + type: string + document: + properties: + configMapKeyRef: + properties: + key: + type: string + x-kubernetes-validations: + - message: configMapKeyRef key must be not empty + rule: self != '' + name: + type: string + x-kubernetes-validations: + - message: configMapKeyRef name must be not empty + rule: self != '' + required: + - key + - name + type: object + required: + - configMapKeyRef + type: object + name: + type: string + x-kubernetes-validations: + - message: policy name must be not empty and must not contain whitespace + rule: self != '' && !self.matches('.*\\s.*') + required: + - document + - name + type: object + type: array + x-kubernetes-validations: + - message: policy names must be unique + rule: self.all(x, self.filter(y, y.name == x.name).size() == 1) poolLifecycle: description: Explicit lifecycle requests for pool decommissioning. nullable: true @@ -1453,6 +1521,38 @@ spec: - HotReload type: string type: object + users: + description: Regular users that should exist in the RustFS tenant. + items: + properties: + deletionPolicy: + enum: + - Retain + type: string + name: + type: string + x-kubernetes-validations: + - message: user Secret name must be not empty and must not contain whitespace + rule: self != '' && !self.matches('.*\\s.*') + policies: + description: Canned policies to map directly to this user. + items: + type: string + type: array + x-kubernetes-validations: + - message: user policy names must be not empty and must not contain whitespace + rule: self.all(x, x != '' && !x.matches('.*\\s.*')) + - message: user policy names must be unique + rule: self.all(x, self.filter(y, y == x).size() == 1) + required: + - name + type: object + type: array + x-kubernetes-validations: + - message: user Secret names must be unique + rule: self.all(x, self.filter(y, y.name == x.name).size() == 1) + - message: user policies must contain at least one policy + rule: self.all(x, has(x.policies) && x.policies.size() > 0) required: - pools type: object @@ -1776,6 +1876,153 @@ spec: - state type: object type: array + provisioning: + properties: + buckets: + items: + properties: + desiredHash: + nullable: true + type: string + lastAppliedGeneration: + format: int64 + nullable: true + type: integer + lastAppliedHash: + nullable: true + type: string + lastTransitionTime: + nullable: true + type: string + message: + nullable: true + type: string + name: + type: string + objectLock: + nullable: true + type: boolean + observedSecretResourceVersion: + nullable: true + type: string + policies: + items: + type: string + type: array + reason: + type: string + region: + nullable: true + type: string + state: + type: string + required: + - name + - reason + - state + type: object + type: array + observedGeneration: + format: int64 + nullable: true + type: integer + phase: + enum: + - Pending + - Ready + - Failed + - null + nullable: true + type: string + policies: + items: + properties: + desiredHash: + nullable: true + type: string + lastAppliedGeneration: + format: int64 + nullable: true + type: integer + lastAppliedHash: + nullable: true + type: string + lastTransitionTime: + nullable: true + type: string + message: + nullable: true + type: string + name: + type: string + objectLock: + nullable: true + type: boolean + observedSecretResourceVersion: + nullable: true + type: string + policies: + items: + type: string + type: array + reason: + type: string + region: + nullable: true + type: string + state: + type: string + required: + - name + - reason + - state + type: object + type: array + users: + items: + properties: + desiredHash: + nullable: true + type: string + lastAppliedGeneration: + format: int64 + nullable: true + type: integer + lastAppliedHash: + nullable: true + type: string + lastTransitionTime: + nullable: true + type: string + message: + nullable: true + type: string + name: + type: string + objectLock: + nullable: true + type: boolean + observedSecretResourceVersion: + nullable: true + type: string + policies: + items: + type: string + type: array + reason: + type: string + region: + nullable: true + type: string + state: + type: string + required: + - name + - reason + - state + type: object + type: array + type: object required: - availableReplicas - currentState @@ -1789,4 +2036,3 @@ spec: storage: true subresources: status: {} ---- diff --git a/deploy/rustfs-operator/crds/tenant.yaml b/deploy/rustfs-operator/crds/tenant.yaml index 967db03..b7c6ec8 100755 --- a/deploy/rustfs-operator/crds/tenant.yaml +++ b/deploy/rustfs-operator/crds/tenant.yaml @@ -27,6 +27,32 @@ spec: properties: spec: properties: + buckets: + description: Buckets that should exist in the RustFS tenant. + items: + properties: + deletionPolicy: + enum: + - Retain + type: string + name: + type: string + x-kubernetes-validations: + - message: bucket name must be a valid RustFS/S3 bucket name + rule: self.size() >= 3 && self.size() <= 63 && self != 'rustfs' && !self.matches('^(\\d+\\.){3}\\d+$') && !self.contains('..') && !self.contains('.-') && !self.contains('-.') && self.matches('^[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]$') + objectLock: + nullable: true + type: boolean + region: + nullable: true + type: string + required: + - name + type: object + type: array + x-kubernetes-validations: + - message: bucket names must be unique + rule: self.all(x, self.filter(y, y.name == x.name).size() == 1) createServiceAccountRbac: nullable: true type: boolean @@ -422,6 +448,48 @@ spec: - null nullable: true type: string + policies: + description: Canned policies that should be applied to the RustFS tenant. + items: + properties: + deletionPolicy: + enum: + - Retain + type: string + document: + properties: + configMapKeyRef: + properties: + key: + type: string + x-kubernetes-validations: + - message: configMapKeyRef key must be not empty + rule: self != '' + name: + type: string + x-kubernetes-validations: + - message: configMapKeyRef name must be not empty + rule: self != '' + required: + - key + - name + type: object + required: + - configMapKeyRef + type: object + name: + type: string + x-kubernetes-validations: + - message: policy name must be not empty and must not contain whitespace + rule: self != '' && !self.matches('.*\\s.*') + required: + - document + - name + type: object + type: array + x-kubernetes-validations: + - message: policy names must be unique + rule: self.all(x, self.filter(y, y.name == x.name).size() == 1) poolLifecycle: description: Explicit lifecycle requests for pool decommissioning. nullable: true @@ -1453,6 +1521,38 @@ spec: - HotReload type: string type: object + users: + description: Regular users that should exist in the RustFS tenant. + items: + properties: + deletionPolicy: + enum: + - Retain + type: string + name: + type: string + x-kubernetes-validations: + - message: user Secret name must be not empty and must not contain whitespace + rule: self != '' && !self.matches('.*\\s.*') + policies: + description: Canned policies to map directly to this user. + items: + type: string + type: array + x-kubernetes-validations: + - message: user policy names must be not empty and must not contain whitespace + rule: self.all(x, x != '' && !x.matches('.*\\s.*')) + - message: user policy names must be unique + rule: self.all(x, self.filter(y, y == x).size() == 1) + required: + - name + type: object + type: array + x-kubernetes-validations: + - message: user Secret names must be unique + rule: self.all(x, self.filter(y, y.name == x.name).size() == 1) + - message: user policies must contain at least one policy + rule: self.all(x, has(x.policies) && x.policies.size() > 0) required: - pools type: object @@ -1776,6 +1876,153 @@ spec: - state type: object type: array + provisioning: + properties: + buckets: + items: + properties: + desiredHash: + nullable: true + type: string + lastAppliedGeneration: + format: int64 + nullable: true + type: integer + lastAppliedHash: + nullable: true + type: string + lastTransitionTime: + nullable: true + type: string + message: + nullable: true + type: string + name: + type: string + objectLock: + nullable: true + type: boolean + observedSecretResourceVersion: + nullable: true + type: string + policies: + items: + type: string + type: array + reason: + type: string + region: + nullable: true + type: string + state: + type: string + required: + - name + - reason + - state + type: object + type: array + observedGeneration: + format: int64 + nullable: true + type: integer + phase: + enum: + - Pending + - Ready + - Failed + - null + nullable: true + type: string + policies: + items: + properties: + desiredHash: + nullable: true + type: string + lastAppliedGeneration: + format: int64 + nullable: true + type: integer + lastAppliedHash: + nullable: true + type: string + lastTransitionTime: + nullable: true + type: string + message: + nullable: true + type: string + name: + type: string + objectLock: + nullable: true + type: boolean + observedSecretResourceVersion: + nullable: true + type: string + policies: + items: + type: string + type: array + reason: + type: string + region: + nullable: true + type: string + state: + type: string + required: + - name + - reason + - state + type: object + type: array + users: + items: + properties: + desiredHash: + nullable: true + type: string + lastAppliedGeneration: + format: int64 + nullable: true + type: integer + lastAppliedHash: + nullable: true + type: string + lastTransitionTime: + nullable: true + type: string + message: + nullable: true + type: string + name: + type: string + objectLock: + nullable: true + type: boolean + observedSecretResourceVersion: + nullable: true + type: string + policies: + items: + type: string + type: array + reason: + type: string + region: + nullable: true + type: string + state: + type: string + required: + - name + - reason + - state + type: object + type: array + type: object required: - availableReplicas - currentState @@ -1789,4 +2036,3 @@ spec: storage: true subresources: status: {} ---- diff --git a/examples/README.md b/examples/README.md index 273e0cc..10bdd44 100755 --- a/examples/README.md +++ b/examples/README.md @@ -11,6 +11,7 @@ This directory contains example Tenant configurations for the RustFS Kubernetes | [minimal-dev-tenant.yaml](./minimal-dev-tenant.yaml) | Development/Learning | ⭐ Simple | 40Gi | **Start here** if new | | [simple-tenant.yaml](./simple-tenant.yaml) | Documentation Reference | ⭐⭐ Moderate | Configurable | Learning all options | | [secret-credentials-tenant.yaml](./secret-credentials-tenant.yaml) | Secret-based Credentials | ⭐ Simple | Configurable | **Production credential security** | +| [provisioning-tenant.yaml](./provisioning-tenant.yaml) | Policy/User/Bucket Provisioning | ⭐⭐ Moderate | 40Gi | Tenant bootstrap automation | | [multi-pool-tenant.yaml](./multi-pool-tenant.yaml) | Multiple Pools | ⭐⭐ Moderate | ~160Gi | Multi-pool setups | | [production-ha-tenant.yaml](./production-ha-tenant.yaml) | Production HA | ⭐⭐⭐ Advanced | 6.4TB | HA with zone distribution | | [cluster-expansion-tenant.yaml](./cluster-expansion-tenant.yaml) | Capacity Expansion | ⭐⭐⭐ Advanced | 384TB | Growing cluster capacity | @@ -121,6 +122,27 @@ kubectl describe pod secure-tenant-pool-0-0 | grep -A5 "Environment:" --- +### [provisioning-tenant.yaml](./provisioning-tenant.yaml) **Tenant Provisioning** + +Demonstrates operator-managed RustFS canned policies, regular users, and buckets. + +**Features demonstrated:** +- Tenant admin credentials through `spec.credsSecret` +- Policy document stored in a labeled ConfigMap +- User credentials stored in a labeled Secret +- Required non-empty direct policy mapping for each user +- Bucket creation with object lock verification + +**Deployment:** +```bash +kubectl apply -f examples/provisioning-tenant.yaml +kubectl wait --for=condition=Ready tenant/provisioning-demo --timeout=10m +``` + +ConfigMaps and user Secrets should carry `rustfs.tenant=` when they are managed outside the Console so updates enqueue the owning Tenant. + +--- + ### 4. [multi-pool-tenant.yaml](./multi-pool-tenant.yaml) 🔄 **Multi-Pool** Multiple storage pools within a single tenant. diff --git a/examples/provisioning-tenant.yaml b/examples/provisioning-tenant.yaml new file mode 100644 index 0000000..59b0e1f --- /dev/null +++ b/examples/provisioning-tenant.yaml @@ -0,0 +1,89 @@ +# Tenant provisioning example +# +# Demonstrates operator-managed RustFS policy, user, and bucket provisioning. +# Demo credentials are intentionally simple; replace them with externally +# managed Secrets before using this pattern outside a test cluster. + +apiVersion: v1 +kind: Secret +metadata: + name: provisioning-admin-creds + namespace: default +type: Opaque +stringData: + accesskey: admin123 + secretkey: admin12345 + +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: provisioning-app-policy + namespace: default + labels: + rustfs.tenant: provisioning-demo +data: + policy.json: | + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:ListBucket", + "s3:GetObject", + "s3:PutObject", + "s3:DeleteObject" + ], + "Resource": [ + "arn:aws:s3:::provisioning-demo-data", + "arn:aws:s3:::provisioning-demo-data/*" + ] + } + ] + } + +--- +apiVersion: v1 +kind: Secret +metadata: + name: provisioning-app-user + namespace: default + labels: + rustfs.tenant: provisioning-demo +type: Opaque +stringData: + accesskey: appuser01 + secretkey: appuser01secret + +--- +apiVersion: rustfs.com/v1alpha1 +kind: Tenant +metadata: + name: provisioning-demo + namespace: default +spec: + image: rustfs/rustfs:latest + credsSecret: + name: provisioning-admin-creds + pools: + - name: pool-0 + servers: 1 + persistence: + volumesPerServer: 4 + + policies: + - name: app-readwrite + document: + configMapKeyRef: + name: provisioning-app-policy + key: policy.json + + users: + - name: provisioning-app-user + policies: + - app-readwrite + + buckets: + - name: provisioning-demo-data + objectLock: true diff --git a/src/console/handlers/tenants.rs b/src/console/handlers/tenants.rs index a5e4c55..3dd2437 100755 --- a/src/console/handlers/tenants.rs +++ b/src/console/handlers/tenants.rs @@ -18,15 +18,21 @@ use crate::console::{ state::Claims, }; use crate::types::v1alpha1::{ - encryption::PodSecurityContextOverride, persistence::PersistenceConfig, pool::Pool, - tenant::Tenant, + encryption::PodSecurityContextOverride, + persistence::PersistenceConfig, + pool::Pool, + tenant::{Tenant, TenantSpec}, }; use axum::{ Extension, Json, extract::{Path, Query}, }; use k8s_openapi::api::core::v1 as corev1; -use kube::{Api, Client, ResourceExt, api::ListParams}; +use kube::{ + Api, Client, ResourceExt, + api::{ListParams, Patch, PatchParams}, +}; +use serde_json::json; // curl -s -X POST http://localhost:9090/api/v1/login \ // -H "Content-Type: application/json" \ @@ -91,7 +97,7 @@ pub async fn get_tenant_state_counts_by_namespace( Extension(claims): Extension, ) -> Result> { let client = create_client(&claims).await?; - let api: Api = Api::namespaced(client, &namespace); + let api: Api = Api::namespaced(client.clone(), &namespace); let tenants = api .list(&ListParams::default()) @@ -164,6 +170,11 @@ pub async fn get_tenant_details( let conditions = tenant_conditions(&tenant); let next_actions = status_summary.next_actions.clone(); let certificates = tenant_certificates(&tenant); + let provisioning = tenant + .status + .as_ref() + .map(|status| status.provisioning.clone()) + .unwrap_or_default(); Ok(Json(TenantDetailsResponse { name: tenant.name_any(), @@ -183,6 +194,7 @@ pub async fn get_tenant_details( conditions, next_actions, certificates, + provisioning, image: tenant.spec.image.clone(), mount_path: tenant.spec.mount_path.clone(), created_at: tenant @@ -284,18 +296,23 @@ pub async fn create_tenant( creds_secret: req .creds_secret .map(|name| corev1::LocalObjectReference { name }), + policies: req.policies.unwrap_or_default(), + users: req.users.unwrap_or_default(), + buckets: req.buckets.unwrap_or_default(), security_context, ..Default::default() }, status: None, }; - let api: Api = Api::namespaced(client, &req.namespace); + let api: Api = Api::namespaced(client.clone(), &req.namespace); let created = api .create(&Default::default(), &tenant) .await .map_err(|e| error::map_kube_error(e, format!("Tenant '{}'", req.name)))?; + label_provisioning_references(&client, &req.namespace, &req.name, &created.spec).await; + let item = tenant_to_list_item(created); Ok(Json(item)) @@ -326,7 +343,7 @@ pub async fn update_tenant( Json(req): Json, ) -> Result> { let client = create_client(&claims).await?; - let api: Api = Api::namespaced(client, &namespace); + let api: Api = Api::namespaced(client.clone(), &namespace); // Load current object let mut tenant = api @@ -432,6 +449,21 @@ pub async fn update_tenant( updated_fields.push(format!("logging={}", logging.log_type)); } + if let Some(policies) = req.policies { + tenant.spec.policies = policies; + updated_fields.push("policies".to_string()); + } + + if let Some(users) = req.users { + tenant.spec.users = users; + updated_fields.push("users".to_string()); + } + + if let Some(buckets) = req.buckets { + tenant.spec.buckets = buckets; + updated_fields.push("buckets".to_string()); + } + if updated_fields.is_empty() { return Err(Error::BadRequest { message: "No fields to update".to_string(), @@ -444,6 +476,8 @@ pub async fn update_tenant( .await .map_err(|e| error::map_kube_error(e, format!("Tenant '{}'", name)))?; + label_provisioning_references(&client, &namespace, &name, &updated_tenant.spec).await; + Ok(Json(UpdateTenantResponse { success: true, message: format!("Tenant updated: {}", updated_fields.join(", ")), @@ -616,6 +650,58 @@ fn summarize_tenant_states(tenants: &[Tenant]) -> TenantStateCountsResponse { } } +async fn label_provisioning_references( + client: &Client, + namespace: &str, + tenant_name: &str, + spec: &TenantSpec, +) { + let patch = json!({ + "metadata": { + "labels": { + "rustfs.tenant": tenant_name, + }, + }, + }); + let params = PatchParams::default(); + + let config_maps: std::collections::BTreeSet<_> = spec + .policies + .iter() + .map(|policy| policy.document.config_map_key_ref.name.as_str()) + .collect(); + let config_map_api: Api = Api::namespaced(client.clone(), namespace); + for name in config_maps { + if let Err(error) = config_map_api + .patch(name, ¶ms, &Patch::Merge(&patch)) + .await + { + tracing::debug!( + namespace, + tenant = tenant_name, + config_map = name, + %error, + "Failed to label provisioning policy ConfigMap" + ); + } + } + + let secrets: std::collections::BTreeSet<_> = + spec.users.iter().map(|user| user.name.as_str()).collect(); + let secret_api: Api = Api::namespaced(client.clone(), namespace); + for name in secrets { + if let Err(error) = secret_api.patch(name, ¶ms, &Patch::Merge(&patch)).await { + tracing::debug!( + namespace, + tenant = tenant_name, + secret = name, + %error, + "Failed to label provisioning user Secret" + ); + } + } +} + #[cfg(test)] mod tests { use super::state_matches_filter; diff --git a/src/console/models/tenant.rs b/src/console/models/tenant.rs index 0412c3a..7f794b2 100755 --- a/src/console/models/tenant.rs +++ b/src/console/models/tenant.rs @@ -13,9 +13,10 @@ // limitations under the License. use crate::types::v1alpha1::{ + provisioning::{ProvisioningBucket, ProvisioningPolicy, ProvisioningUser}, status::{ ConditionStatus, ConditionType, CurrentState, Reason, Status, canonical_filter_state, - canonical_state, certificate, next_actions_for_reason, primary_condition, + canonical_state, certificate, next_actions_for_reason, primary_condition, provisioning, summarize_current_state, }, tenant::Tenant, @@ -83,6 +84,8 @@ pub struct TenantDetailsResponse { pub next_actions: Vec, #[serde(skip_serializing_if = "certificate::Status::is_empty")] pub certificates: certificate::Status, + #[serde(skip_serializing_if = "provisioning::ProvisioningStatus::is_empty")] + pub provisioning: provisioning::ProvisioningStatus, pub image: Option, pub mount_path: Option, pub created_at: Option, @@ -148,6 +151,9 @@ pub struct CreateTenantRequest { pub image: Option, pub mount_path: Option, pub creds_secret: Option, + pub policies: Option>, + pub users: Option>, + pub buckets: Option>, /// Optional Pod SecurityContext override (runAsUser, runAsGroup, fsGroup, runAsNonRoot). pub security_context: Option, } @@ -193,6 +199,15 @@ pub struct UpdateTenantRequest { /// Logging sidecar / volume settings pub logging: Option, + + /// Replace canned policy provisioning declarations. + pub policies: Option>, + + /// Replace regular user provisioning declarations. + pub users: Option>, + + /// Replace bucket provisioning declarations. + pub buckets: Option>, } /// Key/value environment variable diff --git a/src/console/openapi.rs b/src/console/openapi.rs index 649af5d..baca60b 100644 --- a/src/console/openapi.rs +++ b/src/console/openapi.rs @@ -48,6 +48,13 @@ use crate::console::models::topology::{ TopologyCluster, TopologyClusterSummary, TopologyNamespace, TopologyNode, TopologyOverviewResponse, TopologyPod, TopologyPool, TopologyTenant, TopologyTenantSummary, }; +use crate::types::v1alpha1::provisioning::{ + ConfigMapKeyReference, PolicyDocumentSource, ProvisioningBucket, ProvisioningDeletionPolicy, + ProvisioningPolicy, ProvisioningUser, +}; +use crate::types::v1alpha1::status::provisioning::{ + ProvisioningItemState, ProvisioningItemStatus, ProvisioningPhase, ProvisioningStatus, +}; #[derive(OpenApi)] #[openapi( @@ -96,6 +103,16 @@ use crate::console::models::topology::{ TenantCondition, TenantStatusSummary, TenantDetailsResponse, + ProvisioningStatus, + ProvisioningPhase, + ProvisioningItemStatus, + ProvisioningItemState, + ProvisioningPolicy, + ProvisioningUser, + ProvisioningBucket, + ProvisioningDeletionPolicy, + PolicyDocumentSource, + ConfigMapKeyReference, CreateTenantRequest, CreatePoolRequest, PoolInfo, @@ -407,4 +424,30 @@ mod tests { ); } } + + #[test] + fn tenant_api_documents_provisioning_fields() { + let spec = serde_json::to_value(ApiDoc::openapi()).expect("OpenAPI spec serializes"); + let schemas = spec + .pointer("/components/schemas") + .and_then(Value::as_object) + .expect("schemas exist"); + + assert!(schemas.contains_key("ProvisioningStatus")); + assert_eq!( + spec.pointer("/components/schemas/TenantDetailsResponse/properties/provisioning/$ref") + .and_then(Value::as_str), + Some("#/components/schemas/ProvisioningStatus") + ); + assert_eq!( + spec.pointer("/components/schemas/CreateTenantRequest/properties/policies/items/$ref") + .and_then(Value::as_str), + Some("#/components/schemas/ProvisioningPolicy") + ); + assert_eq!( + spec.pointer("/components/schemas/UpdateTenantRequest/properties/buckets/items/$ref") + .and_then(Value::as_str), + Some("#/components/schemas/ProvisioningBucket") + ); + } } diff --git a/src/lib.rs b/src/lib.rs index aebb0f6..55f1480 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -102,9 +102,10 @@ pub async fn run() -> Result<(), Box> { let tenant_client = Api::::all(client.clone()); let context = Context::new(client.clone()); let controller = Controller::new(tenant_client, watcher::Config::default()) - .owns( + .watches( Api::::all(client.clone()), watcher::Config::default(), + tenant_refs_for_config_map, ) .watches( Api::::all(client.clone()), @@ -287,6 +288,14 @@ fn tenant_refs_for_secret(secret: corev1::Secret) -> Vec> { ) } +fn tenant_refs_for_config_map(config_map: corev1::ConfigMap) -> Vec> { + tenant_refs_from_metadata( + config_map.metadata.namespace.as_deref(), + config_map.metadata.owner_references.as_deref(), + config_map.metadata.labels.as_ref(), + ) +} + fn tenant_refs_for_cert_manager_certificate(certificate: DynamicObject) -> Vec> { tenant_refs_from_metadata( certificate.metadata.namespace.as_deref(), @@ -435,6 +444,38 @@ mod controller_watch_tests { assert_single_ref(&refs, "tenant-b", "storage"); } + #[test] + fn config_map_mapper_uses_owner_reference_or_label() { + let owned = corev1::ConfigMap { + metadata: metav1::ObjectMeta { + name: Some("policy".to_string()), + namespace: Some("storage".to_string()), + owner_references: Some(vec![tenant_owner_ref("tenant-policy")]), + ..Default::default() + }, + ..Default::default() + }; + + let refs = tenant_refs_for_config_map(owned); + assert_single_ref(&refs, "tenant-policy", "storage"); + + let labeled = corev1::ConfigMap { + metadata: metav1::ObjectMeta { + name: Some("policy".to_string()), + namespace: Some("storage".to_string()), + labels: Some(BTreeMap::from([( + "rustfs.tenant".to_string(), + "tenant-policy-label".to_string(), + )])), + ..Default::default() + }, + ..Default::default() + }; + + let refs = tenant_refs_for_config_map(labeled); + assert_single_ref(&refs, "tenant-policy-label", "storage"); + } + #[test] fn cert_manager_certificate_mapper_uses_owner_reference_or_label() { let resource = cert_manager_certificate_api_resource(); diff --git a/src/reconcile.rs b/src/reconcile.rs index ca0c805..5cbf2cd 100755 --- a/src/reconcile.rs +++ b/src/reconcile.rs @@ -29,6 +29,7 @@ use tracing::{debug, error, info, warn}; mod phases; mod pool_lifecycle; +mod provisioning; mod tls; use phases::{ diff --git a/src/reconcile/phases.rs b/src/reconcile/phases.rs index 2a3f250..dcd30c9 100644 --- a/src/reconcile/phases.rs +++ b/src/reconcile/phases.rs @@ -13,6 +13,7 @@ // limitations under the License. use super::pool_lifecycle::{PoolLifecycleDecision, PoolLifecycleDecisions}; +use super::provisioning::{ProvisioningOutcome, reconcile_provisioning}; use super::{ Error, cleanup_stuck_terminating_pods_on_down_nodes, context, context_result, patch_status_and_record, patch_status_error, statefulset_owned_by_tenant, types_result, @@ -815,16 +816,41 @@ pub(super) async fn finalize_tenant_status( "StatefulSet rollout in progress".to_string(), ) } else if summary.ready_replicas == summary.total_replicas && summary.total_replicas > 0 { - builder.finish_success(); - ( - ConditionType::Ready, - Reason::ReconcileSucceeded, - EventType::Normal, - format!( - "{}/{} pods ready", - summary.ready_replicas, summary.total_replicas - ), - ) + let namespace = tenant.namespace()?; + let provisioning = reconcile_provisioning(ctx, tenant, &namespace).await; + builder.set_provisioning_status(provisioning.status); + match provisioning.outcome { + ProvisioningOutcome::Ready => { + builder.finish_provisioning_ready(); + ( + ConditionType::Ready, + Reason::ReconcileSucceeded, + EventType::Normal, + format!( + "{}/{} pods ready", + summary.ready_replicas, summary.total_replicas + ), + ) + } + ProvisioningOutcome::Pending { message } => { + builder.finish_provisioning_pending(message.clone()); + ( + ConditionType::ProvisioningReady, + Reason::ProvisioningPending, + EventType::Normal, + message, + ) + } + ProvisioningOutcome::Failed { reason, message } => { + builder.finish_provisioning_failed(reason, message.clone()); + ( + ConditionType::ProvisioningReady, + reason, + EventType::Warning, + message, + ) + } + } } else { builder.finish_reconciling( Reason::PodsNotReady, diff --git a/src/reconcile/provisioning.rs b/src/reconcile/provisioning.rs new file mode 100644 index 0000000..91fde2d --- /dev/null +++ b/src/reconcile/provisioning.rs @@ -0,0 +1,1155 @@ +// Copyright 2025 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::context::{self, Context}; +use crate::sts::rustfs_client::{CreateBucketResult, RustfsAdminClient, RustfsClientError}; +use crate::types::v1alpha1::provisioning::{ + ProvisioningBucket, ProvisioningPolicy, ProvisioningUser, +}; +use crate::types::v1alpha1::status::Reason; +use crate::types::v1alpha1::status::provisioning::{ + ProvisioningItemState, ProvisioningItemStatus, ProvisioningPhase, ProvisioningStatus, +}; +use crate::types::v1alpha1::tenant::Tenant; +use k8s_openapi::ByteString; +use k8s_openapi::api::core::v1::{ConfigMap, Secret}; +use serde_json::Value; +use sha2::{Digest, Sha256}; +use std::collections::{BTreeMap, BTreeSet}; + +pub(super) struct ProvisioningReconcileResult { + pub status: ProvisioningStatus, + pub outcome: ProvisioningOutcome, +} + +pub(super) enum ProvisioningOutcome { + Ready, + Pending { message: String }, + Failed { reason: Reason, message: String }, +} + +struct ProvisioningRun<'a> { + ctx: &'a Context, + tenant: &'a Tenant, + namespace: &'a str, + previous: ProvisioningStatus, + now: String, + status: ProvisioningStatus, + failures: Vec<(Reason, String)>, +} + +struct UserCredentials { + access_key: String, + secret_key: String, + resource_version: Option, +} + +impl ProvisioningRun<'_> { + fn previous_policy(&self, name: &str) -> Option<&ProvisioningItemStatus> { + self.previous.policies.iter().find(|item| item.name == name) + } + + fn previous_user(&self, name: &str) -> Option<&ProvisioningItemStatus> { + self.previous.users.iter().find(|item| item.name == name) + } + + fn previous_bucket(&self, name: &str) -> Option<&ProvisioningItemStatus> { + self.previous.buckets.iter().find(|item| item.name == name) + } + + fn push_policy(&mut self, item: ProvisioningItemStatus) { + if item.state == ProvisioningItemState::Failed.as_str() { + self.failures + .push((reason_from_str(&item.reason), item_message(&item))); + } + self.status.policies.push(item); + } + + fn push_user(&mut self, item: ProvisioningItemStatus) { + if item.state == ProvisioningItemState::Failed.as_str() { + self.failures + .push((reason_from_str(&item.reason), item_message(&item))); + } + self.status.users.push(item); + } + + fn push_bucket(&mut self, item: ProvisioningItemStatus) { + if item.state == ProvisioningItemState::Failed.as_str() { + self.failures + .push((reason_from_str(&item.reason), item_message(&item))); + } + self.status.buckets.push(item); + } + + fn item( + &self, + previous: Option<&ProvisioningItemStatus>, + name: &str, + state: ProvisioningItemState, + reason: Reason, + message: impl Into, + ) -> ProvisioningItemStatus { + let message = message.into(); + let mut item = ProvisioningItemStatus::new(name, state, reason.as_str()); + item.message = Some(message.clone()); + item.last_transition_time = match previous { + Some(previous) + if previous.state == item.state + && previous.reason == item.reason + && previous.message.as_deref() == Some(message.as_str()) => + { + previous.last_transition_time.clone() + } + _ => Some(self.now.clone()), + }; + item + } + + fn retained_item(&self, previous: &ProvisioningItemStatus) -> ProvisioningItemStatus { + let mut item = self.item( + Some(previous), + &previous.name, + ProvisioningItemState::Retained, + Reason::ProvisioningConfigured, + "Item was removed from spec and retained in RustFS", + ); + item.desired_hash = previous.desired_hash.clone(); + item.last_applied_hash = previous.last_applied_hash.clone(); + item.last_applied_generation = previous.last_applied_generation; + item.observed_secret_resource_version = previous.observed_secret_resource_version.clone(); + item.policies = previous.policies.clone(); + item.region = previous.region.clone(); + item.object_lock = previous.object_lock; + item + } + + fn mark_all_active(&mut self, state: ProvisioningItemState, reason: Reason, message: &str) { + for policy in &self.tenant.spec.policies { + let mut item = self.item( + self.previous_policy(&policy.name), + &policy.name, + state.clone(), + reason, + message, + ); + if let Some(previous) = self.previous_policy(&policy.name) { + item.desired_hash = previous.desired_hash.clone(); + item.last_applied_hash = previous.last_applied_hash.clone(); + item.last_applied_generation = previous.last_applied_generation; + } + self.push_policy(item); + } + for user in &self.tenant.spec.users { + let mut item = self.item( + self.previous_user(&user.name), + &user.name, + state.clone(), + reason, + message, + ); + if let Some(previous) = self.previous_user(&user.name) { + item.observed_secret_resource_version = + previous.observed_secret_resource_version.clone(); + item.policies = previous.policies.clone(); + } + self.push_user(item); + } + for bucket in &self.tenant.spec.buckets { + let item = self.item( + self.previous_bucket(&bucket.name), + &bucket.name, + state.clone(), + reason, + message, + ); + self.push_bucket(item); + } + } + + fn fail_all_active(&mut self, reason: Reason, message: &str) { + self.mark_all_active(ProvisioningItemState::Failed, reason, message); + } + + fn add_retained_items(&mut self) { + let policies = desired_names(self.tenant.spec.policies.iter().map(|policy| &policy.name)); + for previous in &self.previous.policies { + if !policies.contains(&previous.name) { + self.status.policies.push(self.retained_item(previous)); + } + } + + let users = desired_names(self.tenant.spec.users.iter().map(|user| &user.name)); + for previous in &self.previous.users { + if !users.contains(&previous.name) { + self.status.users.push(self.retained_item(previous)); + } + } + + let buckets = desired_names(self.tenant.spec.buckets.iter().map(|bucket| &bucket.name)); + for previous in &self.previous.buckets { + if !buckets.contains(&previous.name) { + self.status.buckets.push(self.retained_item(previous)); + } + } + } + + fn prepare_status(&mut self, phase: ProvisioningPhase) { + self.add_retained_items(); + self.status.policies.sort_by(|a, b| a.name.cmp(&b.name)); + self.status.users.sort_by(|a, b| a.name.cmp(&b.name)); + self.status.buckets.sort_by(|a, b| a.name.cmp(&b.name)); + if !self.status.is_empty() { + self.status.observed_generation = self.tenant.metadata.generation; + self.status.phase = Some(phase); + } + } + + fn finish(mut self) -> ProvisioningReconcileResult { + let outcome = self + .failures + .first() + .map(|(reason, message)| ProvisioningOutcome::Failed { + reason: *reason, + message: message.clone(), + }) + .unwrap_or(ProvisioningOutcome::Ready); + let phase = match &outcome { + ProvisioningOutcome::Ready => ProvisioningPhase::Ready, + ProvisioningOutcome::Pending { .. } => ProvisioningPhase::Pending, + ProvisioningOutcome::Failed { .. } => ProvisioningPhase::Failed, + }; + self.prepare_status(phase); + + ProvisioningReconcileResult { + status: self.status, + outcome, + } + } +} + +pub(super) async fn reconcile_provisioning( + ctx: &Context, + tenant: &Tenant, + namespace: &str, +) -> ProvisioningReconcileResult { + let previous = tenant + .status + .as_ref() + .map(|status| status.provisioning.clone()) + .unwrap_or_default(); + let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + let mut run = ProvisioningRun { + ctx, + tenant, + namespace, + previous, + now, + status: ProvisioningStatus::default(), + failures: Vec::new(), + }; + + if !has_active_spec(tenant) { + run.prepare_status(ProvisioningPhase::Ready); + return ProvisioningReconcileResult { + status: run.status, + outcome: ProvisioningOutcome::Ready, + }; + } + + let client = match rustfs_admin_client(ctx, tenant).await { + Ok(client) => client, + Err(error) => { + let (reason, message, pending) = client_error_outcome(error); + if pending { + run.mark_all_active(ProvisioningItemState::Pending, reason, &message); + } else { + run.fail_all_active(reason, &message); + } + let phase = if pending { + ProvisioningPhase::Pending + } else { + ProvisioningPhase::Failed + }; + run.prepare_status(phase); + return ProvisioningReconcileResult { + status: run.status, + outcome: if pending { + ProvisioningOutcome::Pending { message } + } else { + ProvisioningOutcome::Failed { reason, message } + }, + }; + } + }; + + let mut live_policies = match load_live_policies(&client, tenant).await { + Ok(policies) => policies, + Err(message) => { + run.fail_all_active(Reason::PolicyApplyFailed, &message); + run.prepare_status(ProvisioningPhase::Failed); + return ProvisioningReconcileResult { + status: run.status, + outcome: ProvisioningOutcome::Failed { + reason: Reason::PolicyApplyFailed, + message, + }, + }; + } + }; + + reconcile_policies(&mut run, &client, &mut live_policies).await; + reconcile_users(&mut run, &client, &live_policies).await; + reconcile_buckets(&mut run, &client).await; + run.finish() +} + +async fn rustfs_admin_client( + ctx: &Context, + tenant: &Tenant, +) -> Result { + let credentials = RustfsAdminClient::load_tenant_credentials(&ctx.client, tenant).await?; + if tenant.spec.tls.as_ref().is_some_and(|tls| tls.is_enabled()) { + RustfsAdminClient::from_tls_tenant_for_sts(&ctx.client, tenant, credentials).await + } else { + RustfsAdminClient::from_tenant(tenant, credentials) + } +} + +fn client_error_outcome(error: RustfsClientError) -> (Reason, String, bool) { + match error { + RustfsClientError::MissingCredsSecret => ( + Reason::ProvisioningUnsupported, + "configure spec.credsSecret before enabling provisioning".to_string(), + false, + ), + RustfsClientError::TenantTlsClientCertificateRequired => ( + Reason::ProvisioningUnsupported, + "tenant TLS client certificate authentication is not supported for provisioning yet" + .to_string(), + false, + ), + RustfsClientError::TenantTlsNotReady => ( + Reason::ProvisioningPending, + "tenant TLS is not ready for provisioning".to_string(), + true, + ), + error => ( + Reason::ProvisioningFailed, + format!("failed to create RustFS admin client: {error}"), + false, + ), + } +} + +async fn load_live_policies( + client: &RustfsAdminClient, + tenant: &Tenant, +) -> Result, String> { + if tenant.spec.policies.is_empty() + && tenant + .spec + .users + .iter() + .all(|user| user.policies.is_empty()) + { + return Ok(BTreeMap::new()); + } + + client + .list_canned_policies() + .await + .map_err(|error| format!("failed to list RustFS canned policies: {error}")) +} + +async fn reconcile_policies( + run: &mut ProvisioningRun<'_>, + client: &RustfsAdminClient, + live_policies: &mut BTreeMap, +) { + for policy in &run.tenant.spec.policies { + let item = reconcile_policy(run, client, live_policies, policy).await; + run.push_policy(item); + } +} + +async fn reconcile_policy( + run: &ProvisioningRun<'_>, + client: &RustfsAdminClient, + live_policies: &mut BTreeMap, + policy: &ProvisioningPolicy, +) -> ProvisioningItemStatus { + let previous = run.previous_policy(&policy.name); + let document = match load_policy_document(run, policy).await { + Ok(document) => document, + Err((reason, message)) => { + return run.item( + previous, + &policy.name, + ProvisioningItemState::Failed, + reason, + message, + ); + } + }; + + let desired_hash = hash_document(&document); + let mut item = match live_policies.get(&policy.name) { + Some(live_document) => { + let live_hash = hash_document(live_document); + match previous.and_then(|item| item.last_applied_hash.as_deref()) { + None if live_hash == desired_hash => run.item( + previous, + &policy.name, + ProvisioningItemState::Ready, + Reason::ProvisioningConfigured, + "Existing RustFS policy matches spec and was adopted", + ), + None => run.item( + previous, + &policy.name, + ProvisioningItemState::Failed, + Reason::PolicyConflict, + "Live RustFS policy differs from spec and is not owned by this status", + ), + Some(last_applied_hash) if last_applied_hash == live_hash => { + if live_hash == desired_hash { + run.item( + previous, + &policy.name, + ProvisioningItemState::Ready, + Reason::ProvisioningConfigured, + "RustFS policy already matches spec", + ) + } else { + match apply_policy(client, live_policies, &policy.name, &document).await { + Ok(applied_hash) => { + let mut item = run.item( + previous, + &policy.name, + ProvisioningItemState::Ready, + Reason::ProvisioningConfigured, + "RustFS policy was applied", + ); + item.last_applied_hash = Some(applied_hash); + item + } + Err(message) => run.item( + previous, + &policy.name, + ProvisioningItemState::Failed, + Reason::PolicyApplyFailed, + message, + ), + } + } + } + Some(_) => run.item( + previous, + &policy.name, + ProvisioningItemState::Failed, + Reason::PolicyConflict, + "Live RustFS policy changed since the operator last applied it", + ), + } + } + None => match apply_policy(client, live_policies, &policy.name, &document).await { + Ok(applied_hash) => { + let mut item = run.item( + previous, + &policy.name, + ProvisioningItemState::Ready, + Reason::ProvisioningConfigured, + "RustFS policy was created", + ); + item.last_applied_hash = Some(applied_hash); + item + } + Err(message) => run.item( + previous, + &policy.name, + ProvisioningItemState::Failed, + Reason::PolicyApplyFailed, + message, + ), + }, + }; + + item.desired_hash = Some(desired_hash); + if item.last_applied_hash.is_none() && item.state == ProvisioningItemState::Ready.as_str() { + item.last_applied_hash = item.desired_hash.clone(); + } + if item.last_applied_hash.is_none() { + item.last_applied_hash = previous.and_then(|item| item.last_applied_hash.clone()); + } + item.last_applied_generation = match ( + item.last_applied_hash.as_deref(), + previous.and_then(|item| item.last_applied_hash.as_deref()), + ) { + (Some(current), Some(previous_hash)) if current == previous_hash => { + previous.and_then(|item| item.last_applied_generation) + } + (Some(_), _) if item.state == ProvisioningItemState::Ready.as_str() => { + run.tenant.metadata.generation + } + _ => previous.and_then(|item| item.last_applied_generation), + }; + item +} + +async fn load_policy_document( + run: &ProvisioningRun<'_>, + policy: &ProvisioningPolicy, +) -> Result { + let reference = &policy.document.config_map_key_ref; + let config_map: ConfigMap = + run.ctx + .get(&reference.name, run.namespace) + .await + .map_err(|error| { + if context::is_kube_not_found(&error) { + ( + Reason::PolicyDocumentConfigMapNotFound, + format!("policy ConfigMap '{}' was not found", reference.name), + ) + } else { + ( + Reason::PolicyApplyFailed, + format!( + "failed to read policy ConfigMap '{}': {error}", + reference.name + ), + ) + } + })?; + + let raw = config_map + .data + .as_ref() + .and_then(|data| data.get(&reference.key)) + .ok_or_else(|| { + ( + Reason::PolicyDocumentKeyNotFound, + format!( + "policy ConfigMap '{}' is missing key '{}'", + reference.name, reference.key + ), + ) + })?; + + canonical_json_document(raw).map_err(|message| (Reason::PolicyApplyFailed, message)) +} + +async fn apply_policy( + client: &RustfsAdminClient, + live_policies: &mut BTreeMap, + name: &str, + document: &str, +) -> Result { + client + .add_canned_policy(name, document) + .await + .map_err(|error| format!("failed to apply RustFS policy '{name}': {error}"))?; + + let live_document = client + .get_canned_policy(name) + .await + .map_err(|error| format!("failed to read RustFS policy '{name}' after apply: {error}"))?; + let live_document = canonical_json_document(&live_document)?; + let live_hash = hash_document(&live_document); + live_policies.insert(name.to_string(), live_document); + Ok(live_hash) +} + +async fn reconcile_users( + run: &mut ProvisioningRun<'_>, + client: &RustfsAdminClient, + live_policies: &BTreeMap, +) { + let failed_spec_policies = run + .status + .policies + .iter() + .filter(|item| item.state == ProvisioningItemState::Failed.as_str()) + .map(|item| item.name.clone()) + .collect::>(); + + for user in &run.tenant.spec.users { + let item = reconcile_user(run, client, live_policies, &failed_spec_policies, user).await; + run.push_user(item); + } +} + +async fn reconcile_user( + run: &ProvisioningRun<'_>, + client: &RustfsAdminClient, + live_policies: &BTreeMap, + failed_spec_policies: &BTreeSet, + user: &ProvisioningUser, +) -> ProvisioningItemStatus { + let previous = run.previous_user(&user.name); + if let Err(message) = validate_user_policies(user) { + let item = run.item( + previous, + &user.name, + ProvisioningItemState::Failed, + Reason::UserPolicyInvalid, + message, + ); + return annotate_user_item(item, user, None); + } + + let credentials = match load_user_secret(run, user).await { + Ok(credentials) => credentials, + Err(message) => { + let item = run.item( + previous, + &user.name, + ProvisioningItemState::Failed, + Reason::UserSecretInvalid, + message, + ); + return annotate_user_item(item, user, None); + } + }; + + if let Some(policy_name) = user + .policies + .iter() + .find(|policy_name| failed_spec_policies.contains(*policy_name)) + { + let item = run.item( + previous, + &user.name, + ProvisioningItemState::Failed, + Reason::UserPolicySetFailed, + format!("referenced policy '{policy_name}' is not ready"), + ); + return annotate_user_item(item, user, credentials.resource_version); + } + + if let Some(policy_name) = user + .policies + .iter() + .find(|policy_name| !live_policies.contains_key(*policy_name)) + { + let item = run.item( + previous, + &user.name, + ProvisioningItemState::Failed, + Reason::UserPolicyNotFound, + format!("referenced policy '{policy_name}' does not exist"), + ); + return annotate_user_item(item, user, credentials.resource_version); + } + + let exists = match client.user_exists(&credentials.access_key).await { + Ok(exists) => exists, + Err(error) => { + let item = run.item( + previous, + &user.name, + ProvisioningItemState::Failed, + Reason::UserSecretInvalid, + format!("failed to query RustFS user: {error}"), + ); + return annotate_user_item(item, user, credentials.resource_version); + } + }; + + if !exists + && let Err(error) = client + .add_user(&credentials.access_key, &credentials.secret_key) + .await + { + let item = run.item( + previous, + &user.name, + ProvisioningItemState::Failed, + Reason::UserSecretInvalid, + format!("failed to create RustFS user: {error}"), + ); + return annotate_user_item(item, user, credentials.resource_version); + } + + if let Err(error) = client + .set_user_policy(&credentials.access_key, &user.policies) + .await + { + let item = run.item( + previous, + &user.name, + ProvisioningItemState::Failed, + Reason::UserPolicySetFailed, + format!("failed to set RustFS user policy mapping: {error}"), + ); + return annotate_user_item(item, user, credentials.resource_version); + } + + let reason = if exists { + "UserAlreadyExistsPolicySet" + } else { + Reason::ProvisioningConfigured.as_str() + }; + let mut item = + ProvisioningItemStatus::new(&user.name, ProvisioningItemState::Ready, reason.to_string()); + let message = if exists { + "RustFS user already existed; direct policy mapping was applied" + } else { + "RustFS user was created and direct policy mapping was applied" + }; + item.message = Some(message.to_string()); + item.last_transition_time = match previous { + Some(previous) + if previous.state == item.state + && previous.reason == item.reason + && previous.message.as_deref() == item.message.as_deref() => + { + previous.last_transition_time.clone() + } + _ => Some(run.now.clone()), + }; + annotate_user_item(item, user, credentials.resource_version) +} + +fn annotate_user_item( + mut item: ProvisioningItemStatus, + user: &ProvisioningUser, + resource_version: Option, +) -> ProvisioningItemStatus { + item.observed_secret_resource_version = resource_version; + item.policies = user.policies.clone(); + item +} + +async fn load_user_secret( + run: &ProvisioningRun<'_>, + user: &ProvisioningUser, +) -> Result { + let secret: Secret = run + .ctx + .get(&user.name, run.namespace) + .await + .map_err(|error| { + if context::is_kube_not_found(&error) { + format!("user Secret '{}' was not found", user.name) + } else { + format!("failed to read user Secret '{}': {error}", user.name) + } + })?; + let data = secret + .data + .as_ref() + .ok_or_else(|| format!("user Secret '{}' has no data", user.name))?; + + let access_key = read_compatible_secret_value( + data, + "accesskey", + "CONSOLE_ACCESS_KEY", + &user.name, + "access key", + )?; + let secret_key = read_compatible_secret_value( + data, + "secretkey", + "CONSOLE_SECRET_KEY", + &user.name, + "secret key", + )?; + + validate_user_access_key(&access_key)?; + validate_user_secret_key(&secret_key)?; + + Ok(UserCredentials { + access_key, + secret_key, + resource_version: secret.metadata.resource_version, + }) +} + +fn read_compatible_secret_value( + data: &BTreeMap, + native_key: &'static str, + minio_key: &'static str, + secret_name: &str, + label: &str, +) -> Result { + let native = read_optional_secret_value(data, native_key, secret_name)?; + let minio = read_optional_secret_value(data, minio_key, secret_name)?; + + match (native, minio) { + (Some(native), Some(minio)) if native == minio => Ok(native), + (Some(_), Some(_)) => Err(format!( + "user Secret '{secret_name}' has conflicting {label} values" + )), + (Some(value), None) | (None, Some(value)) => Ok(value), + (None, None) => Err(format!( + "user Secret '{secret_name}' is missing '{native_key}' or '{minio_key}'" + )), + } +} + +fn read_optional_secret_value( + data: &BTreeMap, + key: &'static str, + secret_name: &str, +) -> Result, String> { + let Some(raw) = data.get(key) else { + return Ok(None); + }; + let value = String::from_utf8(raw.0.clone()) + .map_err(|_| format!("user Secret '{secret_name}' key '{key}' must be valid UTF-8"))?; + Ok(Some(value.trim().to_string())) +} + +fn validate_user_access_key(access_key: &str) -> Result<(), String> { + if access_key.len() < 8 { + return Err("user access key must be at least 8 characters".to_string()); + } + if access_key.chars().any(char::is_whitespace) { + return Err("user access key must not contain whitespace".to_string()); + } + if access_key.contains('=') || access_key.contains(',') { + return Err("user access key must not contain reserved characters '=' or ','".to_string()); + } + Ok(()) +} + +fn validate_user_policies(user: &ProvisioningUser) -> Result<(), String> { + if user.policies.is_empty() { + return Err("user must reference at least one policy".to_string()); + } + Ok(()) +} + +fn validate_user_secret_key(secret_key: &str) -> Result<(), String> { + if secret_key.len() < 8 { + return Err("user secret key must be at least 8 characters".to_string()); + } + Ok(()) +} + +async fn reconcile_buckets(run: &mut ProvisioningRun<'_>, client: &RustfsAdminClient) { + for bucket in &run.tenant.spec.buckets { + let item = reconcile_bucket(run, client, bucket).await; + run.push_bucket(item); + } +} + +async fn reconcile_bucket( + run: &ProvisioningRun<'_>, + client: &RustfsAdminClient, + bucket: &ProvisioningBucket, +) -> ProvisioningItemStatus { + let previous = run.previous_bucket(&bucket.name); + if let Err(message) = validate_bucket_name(&bucket.name) { + let item = run.item( + previous, + &bucket.name, + ProvisioningItemState::Failed, + Reason::BucketCreateFailed, + message, + ); + return annotate_bucket_item(item, bucket); + } + + let create_result = match client + .create_bucket( + &bucket.name, + bucket.region.as_deref(), + bucket.object_lock_enabled(), + ) + .await + { + Ok(result) => result, + Err(error) => { + let item = run.item( + previous, + &bucket.name, + ProvisioningItemState::Failed, + Reason::BucketCreateFailed, + format!("failed to create RustFS bucket: {error}"), + ); + return annotate_bucket_item(item, bucket); + } + }; + + if bucket.object_lock_enabled() { + match client.bucket_object_lock_enabled(&bucket.name).await { + Ok(true) => { + let message = match create_result { + CreateBucketResult::Created => { + "RustFS bucket was created with object lock enabled" + } + CreateBucketResult::AlreadyExists => { + "Bucket already existed with object lock enabled" + } + }; + let item = run.item( + previous, + &bucket.name, + ProvisioningItemState::Ready, + Reason::ProvisioningConfigured, + message, + ); + return annotate_bucket_item(item, bucket); + } + Ok(false) => { + let message = match create_result { + CreateBucketResult::Created => { + "Bucket was created but object lock is not enabled" + } + CreateBucketResult::AlreadyExists => { + "Bucket already exists but object lock is not enabled" + } + }; + let item = run.item( + previous, + &bucket.name, + ProvisioningItemState::Failed, + Reason::BucketObjectLockConflict, + message, + ); + return annotate_bucket_item(item, bucket); + } + Err(error) => { + let message = match create_result { + CreateBucketResult::Created => { + format!("failed to verify created bucket object lock: {error}") + } + CreateBucketResult::AlreadyExists => { + format!("failed to verify existing bucket object lock: {error}") + } + }; + let item = run.item( + previous, + &bucket.name, + ProvisioningItemState::Failed, + Reason::BucketObjectLockConflict, + message, + ); + return annotate_bucket_item(item, bucket); + } + } + } + + let message = match create_result { + CreateBucketResult::Created => "RustFS bucket was created", + CreateBucketResult::AlreadyExists => "RustFS bucket already exists", + }; + let item = run.item( + previous, + &bucket.name, + ProvisioningItemState::Ready, + Reason::ProvisioningConfigured, + message, + ); + annotate_bucket_item(item, bucket) +} + +fn annotate_bucket_item( + mut item: ProvisioningItemStatus, + bucket: &ProvisioningBucket, +) -> ProvisioningItemStatus { + item.region = bucket.region.clone(); + item.object_lock = Some(bucket.object_lock_enabled()); + item +} + +fn has_active_spec(tenant: &Tenant) -> bool { + !tenant.spec.policies.is_empty() + || !tenant.spec.users.is_empty() + || !tenant.spec.buckets.is_empty() +} + +fn desired_names<'a>(names: impl Iterator) -> BTreeSet { + names.cloned().collect() +} + +fn validate_bucket_name(bucket_name: &str) -> Result<(), String> { + if bucket_name.trim() != bucket_name { + return Err("bucket name must not contain leading or trailing whitespace".to_string()); + } + if bucket_name.is_empty() { + return Err("bucket name cannot be empty".to_string()); + } + if bucket_name.len() < 3 { + return Err("bucket name cannot be shorter than 3 characters".to_string()); + } + if bucket_name.len() > 63 { + return Err("bucket name cannot be longer than 63 characters".to_string()); + } + if bucket_name == "rustfs" { + return Err("bucket name cannot be rustfs".to_string()); + } + if is_ipv4_address_like(bucket_name) { + return Err("bucket name cannot be an IP address".to_string()); + } + if bucket_name.contains("..") || bucket_name.contains(".-") || bucket_name.contains("-.") { + return Err("bucket name contains invalid dot or hyphen sequence".to_string()); + } + let mut chars = bucket_name.chars(); + let Some(first) = chars.next() else { + return Err("bucket name cannot be empty".to_string()); + }; + let Some(last) = bucket_name.chars().next_back() else { + return Err("bucket name cannot be empty".to_string()); + }; + if !first.is_ascii_lowercase() && !first.is_ascii_digit() { + return Err("bucket name must start with a lowercase letter or digit".to_string()); + } + if !last.is_ascii_lowercase() && !last.is_ascii_digit() { + return Err("bucket name must end with a lowercase letter or digit".to_string()); + } + if !bucket_name + .chars() + .all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '.' || ch == '-') + { + return Err( + "bucket name must contain only lowercase letters, digits, dots, or hyphens".to_string(), + ); + } + Ok(()) +} + +fn is_ipv4_address_like(value: &str) -> bool { + let mut parts = value.split('.'); + (0..4).all(|_| { + parts + .next() + .is_some_and(|part| !part.is_empty() && part.chars().all(|ch| ch.is_ascii_digit())) + }) && parts.next().is_none() +} + +fn canonical_json_document(document: &str) -> Result { + let value = serde_json::from_str::(document) + .map_err(|error| format!("policy document must be valid JSON: {error}"))?; + serde_json::to_string(&value) + .map_err(|error| format!("failed to canonicalize policy document: {error}")) +} + +fn hash_document(document: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(document.as_bytes()); + format!("sha256:{}", hex::encode(hasher.finalize())) +} + +fn item_message(item: &ProvisioningItemStatus) -> String { + item.message + .clone() + .unwrap_or_else(|| "Tenant provisioning failed".to_string()) +} + +fn reason_from_str(reason: &str) -> Reason { + match reason { + "ProvisioningUnsupported" => Reason::ProvisioningUnsupported, + "PolicyDocumentConfigMapNotFound" => Reason::PolicyDocumentConfigMapNotFound, + "PolicyDocumentKeyNotFound" => Reason::PolicyDocumentKeyNotFound, + "PolicyApplyFailed" => Reason::PolicyApplyFailed, + "PolicyConflict" => Reason::PolicyConflict, + "UserSecretInvalid" => Reason::UserSecretInvalid, + "UserPolicyNotFound" => Reason::UserPolicyNotFound, + "UserPolicyInvalid" => Reason::UserPolicyInvalid, + "UserPolicySetFailed" => Reason::UserPolicySetFailed, + "BucketCreateFailed" => Reason::BucketCreateFailed, + "BucketObjectLockConflict" => Reason::BucketObjectLockConflict, + _ => Reason::ProvisioningFailed, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use k8s_openapi::ByteString; + + #[test] + fn compatible_secret_values_are_trimmed_and_must_match() { + let data = BTreeMap::from([ + ("accesskey".to_string(), ByteString(b" app ".to_vec())), + ( + "CONSOLE_ACCESS_KEY".to_string(), + ByteString(b"app".to_vec()), + ), + ]); + + let value = + read_compatible_secret_value(&data, "accesskey", "CONSOLE_ACCESS_KEY", "user", "ak") + .expect("trimmed values should match"); + + assert_eq!(value, "app"); + } + + #[test] + fn access_key_rejects_reserved_characters() { + let error = validate_user_access_key("app=user") + .expect_err("reserved characters should be rejected"); + + assert!(error.contains("reserved characters")); + } + + #[test] + fn access_key_requires_security_baseline_length() { + let error = + validate_user_access_key("app").expect_err("short access keys should be rejected"); + + assert!(error.contains("at least 8 characters")); + } + + #[test] + fn user_policy_list_must_not_be_empty() { + let user = ProvisioningUser { + name: "app-user".to_string(), + policies: Vec::new(), + deletion_policy: Default::default(), + }; + + let error = + validate_user_policies(&user).expect_err("empty policy list should be rejected"); + + assert!(error.contains("at least one policy")); + } + + #[test] + fn policy_document_hash_uses_compact_json() { + let canonical = canonical_json_document( + r#"{ + "Version": "2012-10-17", + "Statement": [] + }"#, + ) + .expect("policy should canonicalize"); + + assert_eq!(canonical, r#"{"Statement":[],"Version":"2012-10-17"}"#); + assert!(hash_document(&canonical).starts_with("sha256:")); + } + + #[test] + fn bucket_name_validation_matches_rustfs_strict_rules() { + assert!(validate_bucket_name("app-data").is_ok()); + assert!(validate_bucket_name("my.bucket.name").is_ok()); + + for invalid in [ + "ab", + "rustfs", + "192.168.1.1", + "MyBucket", + "my_bucket", + "my..bucket", + ] { + assert!( + validate_bucket_name(invalid).is_err(), + "{invalid} should be rejected" + ); + } + } +} diff --git a/src/status.rs b/src/status.rs index 9696b49..2ce6582 100644 --- a/src/status.rs +++ b/src/status.rs @@ -244,6 +244,13 @@ impl StatusBuilder { } } + pub fn set_provisioning_status( + &mut self, + provisioning: crate::types::v1alpha1::status::provisioning::ProvisioningStatus, + ) { + self.next.provisioning = provisioning; + } + pub fn mark_started(&mut self) { self.set_condition( ConditionType::Ready, @@ -419,6 +426,84 @@ impl StatusBuilder { self.set_condition(condition_type, ConditionStatus::False, reason, message); } + pub fn finish_provisioning_ready(&mut self) { + self.finish_success(); + self.set_condition( + ConditionType::ProvisioningReady, + ConditionStatus::True, + Reason::ProvisioningConfigured, + "Tenant provisioning is configured".to_string(), + ); + } + + pub fn finish_provisioning_pending(&mut self, message: String) { + self.mark_default_components_ready(); + self.set_condition( + ConditionType::Ready, + ConditionStatus::False, + Reason::ProvisioningPending, + message.clone(), + ); + self.set_condition( + ConditionType::Reconciling, + ConditionStatus::True, + Reason::ProvisioningPending, + message.clone(), + ); + self.set_condition( + ConditionType::Degraded, + ConditionStatus::False, + Reason::ProvisioningPending, + "Tenant provisioning is progressing".to_string(), + ); + self.set_condition( + ConditionType::WorkloadsReady, + ConditionStatus::True, + Reason::ReconcileSucceeded, + "WorkloadsReady is ready".to_string(), + ); + self.set_condition( + ConditionType::ProvisioningReady, + ConditionStatus::False, + Reason::ProvisioningPending, + message, + ); + } + + pub fn finish_provisioning_failed(&mut self, reason: Reason, message: String) { + self.mark_default_components_ready(); + self.set_condition( + ConditionType::Ready, + ConditionStatus::False, + reason, + message.clone(), + ); + self.set_condition( + ConditionType::Reconciling, + ConditionStatus::False, + reason, + "Reconcile is blocked by provisioning failure".to_string(), + ); + self.set_condition( + ConditionType::Degraded, + ConditionStatus::True, + reason, + message.clone(), + ); + self.set_condition( + ConditionType::WorkloadsReady, + ConditionStatus::True, + Reason::ReconcileSucceeded, + "WorkloadsReady is ready".to_string(), + ); + self.set_condition( + ConditionType::ProvisioningReady, + ConditionStatus::False, + reason, + message, + ); + } + pub fn build(mut self) -> Status { self.next.observed_generation = self.generation; self.next.current_state = summarize_current_state(&self.next); @@ -433,6 +518,7 @@ impl StatusBuilder { ConditionType::KmsReady, ConditionType::PoolsReady, ConditionType::WorkloadsReady, + ConditionType::ProvisioningReady, ] { self.set_condition( condition_type, diff --git a/src/sts/rustfs_client.rs b/src/sts/rustfs_client.rs index bf7aa11..bac307a 100644 --- a/src/sts/rustfs_client.rs +++ b/src/sts/rustfs_client.rs @@ -30,6 +30,10 @@ use crate::sts::types::StsAssumeRoleCredentials; const FORM_CONTENT_TYPE: &str = "application/x-www-form-urlencoded"; const JSON_CONTENT_TYPE: &str = "application/json"; const ASSUME_ROLE_PATH: &str = "/"; +const ADD_USER_PATH: &str = "/rustfs/admin/v3/add-user"; +const USER_INFO_PATH: &str = "/rustfs/admin/v3/user-info"; +const SET_POLICY_PATH: &str = "/rustfs/admin/v3/set-policy"; +const LIST_CANNED_POLICIES_PATH: &str = "/rustfs/admin/v3/list-canned-policies"; const ADD_CANNED_POLICY_PATH: &str = "/rustfs/admin/v3/add-canned-policy"; const INFO_CANNED_POLICY_PATH: &str = "/rustfs/admin/v3/info-canned-policy"; const POOLS_LIST_PATH: &str = "/rustfs/admin/v3/pools/list"; @@ -78,6 +82,12 @@ pub struct RustfsPoolStatus { pub decommission: Option, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CreateBucketResult { + Created, + AlreadyExists, +} + #[derive(Debug, Clone, Default, serde::Deserialize, PartialEq)] pub struct RustfsPoolDecommissionInfo { #[serde(rename = "startTime")] @@ -457,6 +467,215 @@ impl RustfsAdminClient { Ok(()) } + pub async fn list_canned_policies( + &self, + ) -> Result, RustfsClientError> { + let body = self + .send_admin_request("GET", LIST_CANNED_POLICIES_PATH, "", "", None) + .await?; + let policies = serde_json::from_str::>(&body) + .map_err(|_| RustfsClientError::ParseResponseFailed)?; + + policies + .into_iter() + .map(|(name, policy)| { + serde_json::to_string(&policy) + .map(|document| (name, document)) + .map_err(|_| RustfsClientError::ParseResponseFailed) + }) + .collect() + } + + pub async fn user_exists(&self, access_key: &str) -> Result { + if access_key.trim().is_empty() { + return Err(RustfsClientError::InvalidCredentialValue { key: "accesskey" }); + } + + let query = build_query_pairs(&[("accessKey", access_key)]); + let path = USER_INFO_PATH; + let url = format!("{}{}?{query}", self.base_url.trim_end_matches('/'), path); + let signed = self.sign_request("GET", path, &query, "", None, ADMIN_SIGNING_SERVICE)?; + let host = self.host()?; + + let response = self + .http_client + .get(url) + .header("x-amz-date", &signed.amz_date) + .header("x-amz-content-sha256", &signed.payload_hash) + .header("authorization", &signed.authorization) + .header("host", host) + .send() + .await + .map_err(|_| RustfsClientError::RequestFailed)?; + + if response.status().is_success() { + return Ok(true); + } + + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + if status == StatusCode::NOT_FOUND || body_mentions_not_found(&body) { + return Ok(false); + } + + Err(RustfsClientError::UnexpectedStatus(status)) + } + + pub async fn add_user( + &self, + access_key: &str, + secret_key: &str, + ) -> Result<(), RustfsClientError> { + if access_key.trim().is_empty() { + return Err(RustfsClientError::InvalidCredentialValue { key: "accesskey" }); + } + if secret_key.is_empty() { + return Err(RustfsClientError::EmptyCredentialValue { key: "secretkey" }); + } + + let body = serde_json::json!({ + "secretKey": secret_key, + "status": "enabled", + }) + .to_string(); + let query = build_query_pairs(&[("accessKey", access_key)]); + + self.send_admin_request("PUT", ADD_USER_PATH, &query, &body, Some(JSON_CONTENT_TYPE)) + .await + .map(|_| ()) + } + + pub async fn set_user_policy( + &self, + access_key: &str, + policies: &[String], + ) -> Result<(), RustfsClientError> { + if access_key.trim().is_empty() { + return Err(RustfsClientError::InvalidCredentialValue { key: "accesskey" }); + } + if policies.is_empty() || policies.iter().any(|policy| policy.trim().is_empty()) { + return Err(RustfsClientError::InvalidPolicyName); + } + + let policy_names = policies.join(","); + let query = build_query_pairs(&[ + ("isGroup", "false"), + ("policyName", policy_names.as_str()), + ("userOrGroup", access_key), + ]); + + self.send_admin_request("PUT", SET_POLICY_PATH, &query, "", None) + .await + .map(|_| ()) + } + + pub async fn create_bucket( + &self, + bucket: &str, + region: Option<&str>, + object_lock: bool, + ) -> Result { + if bucket.trim().is_empty() { + return Err(RustfsClientError::RequestBuildFailed); + } + + let path = format!("/{bucket}"); + let body = create_bucket_body(region); + let content_type = (!body.is_empty()).then_some("application/xml"); + let mut extra_headers = Vec::new(); + if let Some(content_type) = content_type { + extra_headers.push(("content-type", content_type)); + } + if object_lock { + extra_headers.push(("x-amz-bucket-object-lock-enabled", "true")); + } + let signed = self.sign_request_with_extra_headers( + "PUT", + &path, + "", + &body, + ADMIN_SIGNING_SERVICE, + &extra_headers, + )?; + let host = self.host()?; + + let mut request = self + .http_client + .put(format!("{}{}", self.base_url.trim_end_matches('/'), path)) + .header("x-amz-date", &signed.amz_date) + .header("x-amz-content-sha256", &signed.payload_hash) + .header("authorization", &signed.authorization) + .header("host", host); + + for (name, value) in &extra_headers { + request = request.header(*name, *value); + } + if !body.is_empty() { + request = request.body(body); + } + + let response = request + .send() + .await + .map_err(|_| RustfsClientError::RequestFailed)?; + + if response.status().is_success() { + return Ok(CreateBucketResult::Created); + } + + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + if bucket_already_exists(status, &body) { + return Ok(CreateBucketResult::AlreadyExists); + } + + Err(RustfsClientError::UnexpectedStatus(status)) + } + + pub async fn bucket_object_lock_enabled( + &self, + bucket: &str, + ) -> Result { + if bucket.trim().is_empty() { + return Err(RustfsClientError::RequestBuildFailed); + } + + let path = format!("/{bucket}"); + let query = build_query_pairs(&[("object-lock", "")]); + let signed = self.sign_request("GET", &path, &query, "", None, ADMIN_SIGNING_SERVICE)?; + let host = self.host()?; + + let response = self + .http_client + .get(format!( + "{}{}?{query}", + self.base_url.trim_end_matches('/'), + path + )) + .header("x-amz-date", &signed.amz_date) + .header("x-amz-content-sha256", &signed.payload_hash) + .header("authorization", &signed.authorization) + .header("host", host) + .send() + .await + .map_err(|_| RustfsClientError::RequestFailed)?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + if status == StatusCode::NOT_FOUND || body_mentions_not_found(&body) { + return Ok(false); + } + return Err(RustfsClientError::UnexpectedStatus(status)); + } + + let body = response + .text() + .await + .map_err(|_| RustfsClientError::RequestFailed)?; + Ok(body.contains("Enabled")) + } + pub async fn list_pools(&self) -> Result, RustfsClientError> { let body = self .send_admin_request("GET", POOLS_LIST_PATH, "", "", None) @@ -628,6 +847,28 @@ impl RustfsAdminClient { payload: &str, content_type: Option<&str>, service: &str, + ) -> Result { + let extra_headers = content_type + .map(|content_type| vec![("content-type", content_type)]) + .unwrap_or_default(); + self.sign_request_with_extra_headers( + method, + path, + canonical_query, + payload, + service, + &extra_headers, + ) + } + + fn sign_request_with_extra_headers( + &self, + method: &str, + path: &str, + canonical_query: &str, + payload: &str, + service: &str, + extra_headers: &[(&str, &str)], ) -> Result { let now = chrono::Utc::now(); let amz_date = now.format("%Y%m%dT%H%M%SZ").to_string(); @@ -641,21 +882,19 @@ impl RustfsAdminClient { ("x-amz-date", amz_date.as_str()), ]; - if let Some(content_type) = content_type { - signed_headers.push(("content-type", content_type)); - } + signed_headers.extend(extra_headers.iter().copied()); signed_headers.sort_by_key(|(name, _)| *name); let canonical_headers: String = signed_headers .iter() - .map(|(key, value)| format!("{key}:{value}\n")) + .map(|(key, value)| format!("{}:{}\n", key.to_ascii_lowercase(), value.trim())) .collect(); let mut signed_header_names = String::new(); for (index, (name, _)) in signed_headers.iter().enumerate() { if index > 0 { signed_header_names.push(';'); } - signed_header_names.push_str(name); + signed_header_names.push_str(&name.to_ascii_lowercase()); } let canonical_request = format!( @@ -755,6 +994,50 @@ fn build_query_pairs(params: &[(&str, &str)]) -> String { serializer.finish() } +fn create_bucket_body(region: Option<&str>) -> String { + let Some(region) = region.map(str::trim).filter(|region| !region.is_empty()) else { + return String::new(); + }; + + if region == "us-east-1" { + return String::new(); + } + + format!( + "{}", + escape_xml(region) + ) +} + +fn escape_xml(value: &str) -> String { + value + .replace('&', "&") + .replace('<', "<") + .replace('>', ">") + .replace('"', """) + .replace('\'', "'") +} + +fn body_mentions_not_found(body: &str) -> bool { + let body = body.to_ascii_lowercase(); + body.contains("nosuchuser") + || body.contains("no such user") + || body.contains("user not exist") + || body.contains("nosuchpolicy") + || body.contains("no such policy") + || body.contains("objectlockconfigurationnotfound") + || body.contains("not found") +} + +fn bucket_already_exists(status: StatusCode, body: &str) -> bool { + if status == StatusCode::CONFLICT { + let body = body.to_ascii_lowercase(); + return body.contains("bucketalreadyexists") || body.contains("bucketalreadyownedbyyou"); + } + + false +} + fn extract_canned_policy_document(body: &str) -> Result { let value = serde_json::from_str::(body) .map_err(|_| RustfsClientError::InvalidPolicyDocument)?; @@ -924,6 +1207,7 @@ mod tests { query: Arc>, body: Arc>, authorization: Arc>, + object_lock_header: Arc>, } #[tokio::test] @@ -1249,6 +1533,184 @@ mod tests { server.abort(); } + #[tokio::test] + async fn add_user_uses_expected_path_query_and_body() { + let capture = Capture::default(); + let route_capture = capture.clone(); + + let router = Router::new() + .route( + ADD_USER_PATH, + put( + move |State(c): State, req: Request| async move { + *c.path.lock().await = req.uri().path().to_string(); + *c.query.lock().await = req.uri().query().unwrap_or("").to_string(); + let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) + .await + .unwrap(); + *c.body.lock().await = String::from_utf8(body_bytes.to_vec()).unwrap(); + StatusCode::OK + }, + ), + ) + .with_state(route_capture.clone()); + + let listener = tokio::net::TcpListener::bind(("127.0.0.1", 0)) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { axum::serve(listener, router).await.unwrap() }); + + let client = + RustfsAdminClient::new_with_base_url(format!("http://{addr}"), "access", "secret"); + client.add_user("app-user", "secret123").await.unwrap(); + + assert_eq!(&*capture.path.lock().await, ADD_USER_PATH); + assert_eq!(&*capture.query.lock().await, "accessKey=app-user"); + assert_eq!( + &*capture.body.lock().await, + r#"{"secretKey":"secret123","status":"enabled"}"# + ); + + server.abort(); + } + + #[tokio::test] + async fn set_user_policy_uses_single_authoritative_mapping_call() { + let capture = Capture::default(); + let route_capture = capture.clone(); + + let router = Router::new() + .route( + SET_POLICY_PATH, + put( + move |State(c): State, req: Request| async move { + *c.path.lock().await = req.uri().path().to_string(); + *c.query.lock().await = req.uri().query().unwrap_or("").to_string(); + StatusCode::OK + }, + ), + ) + .with_state(route_capture.clone()); + + let listener = tokio::net::TcpListener::bind(("127.0.0.1", 0)) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { axum::serve(listener, router).await.unwrap() }); + + let client = + RustfsAdminClient::new_with_base_url(format!("http://{addr}"), "access", "secret"); + client + .set_user_policy( + "app-user", + &["app-readwrite".to_string(), "diagnostics".to_string()], + ) + .await + .unwrap(); + + assert_eq!(&*capture.path.lock().await, SET_POLICY_PATH); + assert_eq!( + &*capture.query.lock().await, + "isGroup=false&policyName=app-readwrite%2Cdiagnostics&userOrGroup=app-user" + ); + + server.abort(); + } + + #[tokio::test] + async fn set_user_policy_rejects_empty_policy_list() { + let client = RustfsAdminClient::new_with_base_url("http://127.0.0.1:1", "access", "secret"); + + let err = client + .set_user_policy("app-user", &[]) + .await + .expect_err("empty policy list should be rejected before request"); + + assert!(matches!(err, RustfsClientError::InvalidPolicyName)); + } + + #[tokio::test] + async fn bucket_object_lock_enabled_parses_enabled_response() { + let router = Router::new().route( + "/app-data", + get(|req: Request| async move { + assert_eq!(req.uri().query().unwrap_or(""), "object-lock="); + ( + StatusCode::OK, + "Enabled", + ) + }), + ); + + let listener = tokio::net::TcpListener::bind(("127.0.0.1", 0)) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { axum::serve(listener, router).await.unwrap() }); + + let client = + RustfsAdminClient::new_with_base_url(format!("http://{addr}"), "access", "secret"); + + assert!(client.bucket_object_lock_enabled("app-data").await.unwrap()); + + server.abort(); + } + + #[tokio::test] + async fn create_bucket_sends_object_lock_header_and_region_body() { + let capture = Capture::default(); + let route_capture = capture.clone(); + + let router = Router::new() + .route( + "/app-data", + put( + move |State(c): State, req: Request| async move { + *c.path.lock().await = req.uri().path().to_string(); + *c.object_lock_header.lock().await = req + .headers() + .get("x-amz-bucket-object-lock-enabled") + .and_then(|value| value.to_str().ok()) + .unwrap_or("") + .to_string(); + let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) + .await + .unwrap(); + *c.body.lock().await = String::from_utf8(body_bytes.to_vec()).unwrap(); + StatusCode::OK + }, + ), + ) + .with_state(route_capture.clone()); + + let listener = tokio::net::TcpListener::bind(("127.0.0.1", 0)) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { axum::serve(listener, router).await.unwrap() }); + + let client = + RustfsAdminClient::new_with_base_url(format!("http://{addr}"), "access", "secret"); + let result = client + .create_bucket("app-data", Some("us-west-2"), true) + .await + .unwrap(); + + assert_eq!(result, CreateBucketResult::Created); + assert_eq!(&*capture.path.lock().await, "/app-data"); + assert_eq!(&*capture.object_lock_header.lock().await, "true"); + assert!( + capture + .body + .lock() + .await + .contains("us-west-2") + ); + + server.abort(); + } + #[test] fn extract_canned_policy_document_accepts_raw_policy_document() { let raw_policy = diff --git a/src/types/v1alpha1.rs b/src/types/v1alpha1.rs index 68e2d95..b68403f 100755 --- a/src/types/v1alpha1.rs +++ b/src/types/v1alpha1.rs @@ -19,6 +19,7 @@ pub mod persistence; pub mod policy_binding; pub mod pool; pub mod pool_lifecycle; +pub mod provisioning; pub mod status; pub mod tenant; pub mod tls; @@ -133,3 +134,58 @@ mod policy_binding_tests { assert_eq!(PolicyBinding::plural(&()), "policybindings"); } } + +#[cfg(test)] +mod tenant_provisioning_crd_tests { + use super::tenant::Tenant; + use kube::CustomResourceExt; + use serde_json::json; + + #[test] + fn tenant_crd_includes_provisioning_spec_status_and_uniqueness_rules() { + let crd = serde_json::to_value(Tenant::crd()).expect("Tenant CRD serializes"); + let schema = &crd["spec"]["versions"][0]["schema"]["openAPIV3Schema"]; + let spec = &schema["properties"]["spec"]; + let status = &schema["properties"]["status"]; + + assert_eq!(spec["properties"]["policies"]["type"], json!("array")); + assert_eq!(spec["properties"]["users"]["type"], json!("array")); + assert_eq!(spec["properties"]["buckets"]["type"], json!("array")); + assert_eq!( + status["properties"]["provisioning"]["properties"]["policies"]["type"], + json!("array") + ); + assert_eq!( + status["properties"]["provisioning"]["properties"]["phase"]["enum"], + json!(["Pending", "Ready", "Failed", null]) + ); + + assert_eq!( + spec["properties"]["policies"]["x-kubernetes-validations"][0]["message"], + json!("policy names must be unique") + ); + let user_validations = &spec["properties"]["users"]["x-kubernetes-validations"]; + assert!( + user_validations + .as_array() + .expect("user validations are present") + .iter() + .any(|rule| rule["message"] + == json!("user policies must contain at least one policy")) + ); + let user_policy_validations = &spec["properties"]["users"]["items"]["properties"]["policies"] + ["x-kubernetes-validations"]; + assert!( + user_policy_validations + .as_array() + .expect("user policy validations are present") + .iter() + .any(|rule| rule["message"] == json!("user policy names must be unique")) + ); + assert_eq!( + spec["properties"]["buckets"]["items"]["properties"]["name"]["x-kubernetes-validations"] + [0]["message"], + json!("bucket name must be a valid RustFS/S3 bucket name") + ); + } +} diff --git a/src/types/v1alpha1/provisioning.rs b/src/types/v1alpha1/provisioning.rs new file mode 100644 index 0000000..9b72a71 --- /dev/null +++ b/src/types/v1alpha1/provisioning.rs @@ -0,0 +1,95 @@ +// Copyright 2025 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use kube::KubeSchema; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema, ToSchema, Default, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +pub enum ProvisioningDeletionPolicy { + #[default] + Retain, +} + +pub fn is_retain(policy: &ProvisioningDeletionPolicy) -> bool { + matches!(policy, ProvisioningDeletionPolicy::Retain) +} + +#[derive(Deserialize, Serialize, Clone, Debug, KubeSchema, ToSchema, Default, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ConfigMapKeyReference { + #[x_kube(validation = Rule::new("self != ''").message("configMapKeyRef name must be not empty"))] + pub name: String, + + #[x_kube(validation = Rule::new("self != ''").message("configMapKeyRef key must be not empty"))] + pub key: String, +} + +#[derive(Deserialize, Serialize, Clone, Debug, KubeSchema, ToSchema, Default, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct PolicyDocumentSource { + pub config_map_key_ref: ConfigMapKeyReference, +} + +#[derive(Deserialize, Serialize, Clone, Debug, KubeSchema, ToSchema, Default, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ProvisioningPolicy { + #[x_kube(validation = Rule::new("self != '' && !self.matches('.*\\\\s.*')").message("policy name must be not empty and must not contain whitespace"))] + pub name: String, + + pub document: PolicyDocumentSource, + + #[serde(default, skip_serializing_if = "is_retain")] + pub deletion_policy: ProvisioningDeletionPolicy, +} + +#[derive(Deserialize, Serialize, Clone, Debug, KubeSchema, ToSchema, Default, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ProvisioningUser { + #[x_kube(validation = Rule::new("self != '' && !self.matches('.*\\\\s.*')").message("user Secret name must be not empty and must not contain whitespace"))] + pub name: String, + + /// Canned policies to map directly to this user. + #[x_kube(validation = Rule::new("self.all(x, x != '' && !x.matches('.*\\\\s.*'))").message("user policy names must be not empty and must not contain whitespace"))] + #[x_kube(validation = Rule::new("self.all(x, self.filter(y, y == x).size() == 1)").message("user policy names must be unique"))] + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub policies: Vec, + + #[serde(default, skip_serializing_if = "is_retain")] + pub deletion_policy: ProvisioningDeletionPolicy, +} + +#[derive(Deserialize, Serialize, Clone, Debug, KubeSchema, ToSchema, Default, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ProvisioningBucket { + #[x_kube(validation = Rule::new("self.size() >= 3 && self.size() <= 63 && self != 'rustfs' && !self.matches('^(\\\\d+\\\\.){3}\\\\d+$') && !self.contains('..') && !self.contains('.-') && !self.contains('-.') && self.matches('^[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]$')").message("bucket name must be a valid RustFS/S3 bucket name"))] + pub name: String, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub region: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub object_lock: Option, + + #[serde(default, skip_serializing_if = "is_retain")] + pub deletion_policy: ProvisioningDeletionPolicy, +} + +impl ProvisioningBucket { + pub fn object_lock_enabled(&self) -> bool { + self.object_lock.unwrap_or(false) + } +} diff --git a/src/types/v1alpha1/status.rs b/src/types/v1alpha1/status.rs index c3ff8bb..efb0087 100755 --- a/src/types/v1alpha1/status.rs +++ b/src/types/v1alpha1/status.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod certificate; pub mod pool; +pub mod provisioning; pub mod state; use schemars::JsonSchema; @@ -29,6 +30,7 @@ pub enum ConditionType { TlsReady, PoolsReady, WorkloadsReady, + ProvisioningReady, } impl ConditionType { @@ -43,6 +45,7 @@ impl ConditionType { Self::TlsReady => "TlsReady", Self::PoolsReady => "PoolsReady", Self::WorkloadsReady => "WorkloadsReady", + Self::ProvisioningReady => "ProvisioningReady", } } @@ -57,6 +60,7 @@ impl ConditionType { Self::TlsReady, Self::PoolsReady, Self::WorkloadsReady, + Self::ProvisioningReady, ] .iter() .position(|condition_type| condition_type.as_str() == type_) @@ -144,6 +148,20 @@ pub enum Reason { RolloutInProgress, PodsNotReady, PoolDegraded, + ProvisioningConfigured, + ProvisioningPending, + ProvisioningFailed, + ProvisioningUnsupported, + PolicyDocumentConfigMapNotFound, + PolicyDocumentKeyNotFound, + PolicyApplyFailed, + PolicyConflict, + UserSecretInvalid, + UserPolicyNotFound, + UserPolicyInvalid, + UserPolicySetFailed, + BucketCreateFailed, + BucketObjectLockConflict, KubernetesApiError, StatusPatchFailed, ObservedGenerationStale, @@ -191,6 +209,20 @@ impl Reason { Self::RolloutInProgress => "RolloutInProgress", Self::PodsNotReady => "PodsNotReady", Self::PoolDegraded => "PoolDegraded", + Self::ProvisioningConfigured => "ProvisioningConfigured", + Self::ProvisioningPending => "ProvisioningPending", + Self::ProvisioningFailed => "ProvisioningFailed", + Self::ProvisioningUnsupported => "ProvisioningUnsupported", + Self::PolicyDocumentConfigMapNotFound => "PolicyDocumentConfigMapNotFound", + Self::PolicyDocumentKeyNotFound => "PolicyDocumentKeyNotFound", + Self::PolicyApplyFailed => "PolicyApplyFailed", + Self::PolicyConflict => "PolicyConflict", + Self::UserSecretInvalid => "UserSecretInvalid", + Self::UserPolicyNotFound => "UserPolicyNotFound", + Self::UserPolicyInvalid => "UserPolicyInvalid", + Self::UserPolicySetFailed => "UserPolicySetFailed", + Self::BucketCreateFailed => "BucketCreateFailed", + Self::BucketObjectLockConflict => "BucketObjectLockConflict", Self::KubernetesApiError => "KubernetesApiError", Self::StatusPatchFailed => "StatusPatchFailed", Self::ObservedGenerationStale => "ObservedGenerationStale", @@ -252,6 +284,12 @@ pub struct Status { #[serde(default, skip_serializing_if = "certificate::Status::is_empty")] pub certificates: certificate::Status, + + #[serde( + default, + skip_serializing_if = "provisioning::ProvisioningStatus::is_empty" + )] + pub provisioning: provisioning::ProvisioningStatus, } impl Status { @@ -433,6 +471,17 @@ pub fn is_blocked_reason(reason: &str) -> bool { | "PoolDecommissionCanceled" | "PoolDecommissionFailed" | "StatefulSetUpdateValidationFailed" + | "ProvisioningUnsupported" + | "PolicyDocumentConfigMapNotFound" + | "PolicyDocumentKeyNotFound" + | "PolicyApplyFailed" + | "PolicyConflict" + | "UserSecretInvalid" + | "UserPolicyNotFound" + | "UserPolicyInvalid" + | "UserPolicySetFailed" + | "BucketCreateFailed" + | "BucketObjectLockConflict" ) } @@ -499,6 +548,18 @@ pub fn next_actions_for_reason(reason: &str) -> Vec<&'static str> { "inspectEvents", "inspectOperatorLogs", ], + "ProvisioningPending" => vec!["waitForProvisioning"], + "ProvisioningUnsupported" => vec!["fixTenantProvisioningConfig"], + "PolicyDocumentConfigMapNotFound" => vec!["createPolicyConfigMap"], + "PolicyDocumentKeyNotFound" => vec!["addPolicyConfigMapKey"], + "PolicyApplyFailed" => vec!["fixPolicyDocument", "inspectOperatorLogs"], + "PolicyConflict" => vec!["inspectLivePolicy", "updatePolicySpec"], + "UserSecretInvalid" => vec!["fixUserSecret"], + "UserPolicyNotFound" => vec!["createPolicy", "fixUserPolicyList"], + "UserPolicyInvalid" => vec!["fixUserPolicyList"], + "UserPolicySetFailed" => vec!["inspectUserPolicyMapping", "inspectOperatorLogs"], + "BucketCreateFailed" => vec!["inspectBucket", "inspectOperatorLogs"], + "BucketObjectLockConflict" => vec!["createObjectLockBucket", "fixBucketSpec"], "KubernetesApiError" => vec!["retry", "inspectOperatorLogs"], "ObservedGenerationStale" => vec!["waitForReconcile"], _ => Vec::new(), diff --git a/src/types/v1alpha1/status/provisioning.rs b/src/types/v1alpha1/status/provisioning.rs new file mode 100644 index 0000000..11d0f0a --- /dev/null +++ b/src/types/v1alpha1/status/provisioning.rs @@ -0,0 +1,122 @@ +// Copyright 2025 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema, ToSchema, Default, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ProvisioningStatus { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub observed_generation: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub phase: Option, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub policies: Vec, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub users: Vec, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub buckets: Vec, +} + +impl ProvisioningStatus { + pub fn is_empty(&self) -> bool { + self.policies.is_empty() && self.users.is_empty() && self.buckets.is_empty() + } +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema, ToSchema, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +pub enum ProvisioningPhase { + Pending, + Ready, + Failed, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema, ToSchema, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +pub enum ProvisioningItemState { + Pending, + Ready, + Failed, + Retained, +} + +impl ProvisioningItemState { + pub const fn as_str(&self) -> &'static str { + match self { + Self::Pending => "Pending", + Self::Ready => "Ready", + Self::Failed => "Failed", + Self::Retained => "Retained", + } + } +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema, ToSchema, Default, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ProvisioningItemStatus { + pub name: String, + + pub state: String, + + pub reason: String, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub message: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_transition_time: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub desired_hash: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_applied_hash: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_applied_generation: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub observed_secret_resource_version: Option, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub policies: Vec, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub region: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub object_lock: Option, +} + +impl ProvisioningItemStatus { + pub fn new( + name: impl Into, + state: ProvisioningItemState, + reason: impl Into, + ) -> Self { + Self { + name: name.into(), + state: state.as_str().to_string(), + reason: reason.into(), + ..Default::default() + } + } +} diff --git a/src/types/v1alpha1/tenant.rs b/src/types/v1alpha1/tenant.rs index 4af84e6..030d1b7 100755 --- a/src/types/v1alpha1/tenant.rs +++ b/src/types/v1alpha1/tenant.rs @@ -17,6 +17,9 @@ use crate::types::v1alpha1::k8s; use crate::types::v1alpha1::logging::LoggingConfig; use crate::types::v1alpha1::pool::Pool; use crate::types::v1alpha1::pool_lifecycle::PoolLifecycleSpec; +use crate::types::v1alpha1::provisioning::{ + ProvisioningBucket, ProvisioningPolicy, ProvisioningUser, +}; use crate::types::v1alpha1::tls::TlsConfig; use crate::types::{self, error::NoNamespaceSnafu}; use k8s_openapi::api::core::v1 as corev1; @@ -151,6 +154,22 @@ pub struct TenantSpec { #[serde(default, skip_serializing_if = "Option::is_none")] pub creds_secret: Option, + /// Canned policies that should be applied to the RustFS tenant. + #[x_kube(validation = Rule::new("self.all(x, self.filter(y, y.name == x.name).size() == 1)").message("policy names must be unique"))] + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub policies: Vec, + + /// Regular users that should exist in the RustFS tenant. + #[x_kube(validation = Rule::new("self.all(x, self.filter(y, y.name == x.name).size() == 1)").message("user Secret names must be unique"))] + #[x_kube(validation = Rule::new("self.all(x, has(x.policies) && x.policies.size() > 0)").message("user policies must contain at least one policy"))] + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub users: Vec, + + /// Buckets that should exist in the RustFS tenant. + #[x_kube(validation = Rule::new("self.all(x, self.filter(y, y.name == x.name).size() == 1)").message("bucket names must be unique"))] + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub buckets: Vec, + /// Encryption / KMS configuration for server-side encryption. /// When enabled, the operator injects KMS environment variables and mounts /// secrets into RustFS pods so the in-process `rustfs-kms` library is configured.