diff --git a/apis/milvus.io/v1beta1/dependencies_types.go b/apis/milvus.io/v1beta1/dependencies_types.go index adb39ac7..0ebff7cf 100644 --- a/apis/milvus.io/v1beta1/dependencies_types.go +++ b/apis/milvus.io/v1beta1/dependencies_types.go @@ -16,7 +16,7 @@ type MilvusDependencies struct { // +kubebuilder:validation:Optional Etcd MilvusEtcd `json:"etcd"` - // +kubebuilder:validation:Enum:={"pulsar", "kafka", "rocksmq", ""} + // +kubebuilder:validation:Enum:={"pulsar", "kafka", "rocksmq", "natsmq", ""} // +kubebuilder:validation:Optional // MsgStreamType default to pulsar for cluster, rocksmq for standalone MsgStreamType MsgStreamType `json:"msgStreamType,omitempty"` @@ -28,7 +28,10 @@ type MilvusDependencies struct { Kafka MilvusKafka `json:"kafka,omitempty"` // +kubebuilder:validation:Optional - RocksMQ MilvusRocksMQ `json:"rocksmq,omitempty"` + RocksMQ MilvusBuildInMQ `json:"rocksmq,omitempty"` + + // +kubebuilder:validation:Optional + NatsMQ MilvusBuildInMQ `json:"natsmq,omitempty"` // +kubebuilder:validation:Optional Storage MilvusStorage `json:"storage"` @@ -40,6 +43,7 @@ const ( MsgStreamTypePulsar MsgStreamType = "pulsar" MsgStreamTypeKafka MsgStreamType = "kafka" MsgStreamTypeRocksMQ MsgStreamType = "rocksmq" + MsgStreamTypeNatsMQ MsgStreamType = "natsmq" ) type MilvusEtcd struct { @@ -88,8 +92,8 @@ type MilvusStorage struct { External bool `json:"external,omitempty"` } -// MilvusRocksMQ configuration -type MilvusRocksMQ struct { +// MilvusBuildInMQ (rocksmq or natsmq) configuration +type MilvusBuildInMQ struct { Persistence Persistence `json:"persistence"` } diff --git a/apis/milvus.io/v1beta1/milvus_types.go b/apis/milvus.io/v1beta1/milvus_types.go index f1b4a1b7..8e5021dc 100644 --- a/apis/milvus.io/v1beta1/milvus_types.go +++ b/apis/milvus.io/v1beta1/milvus_types.go @@ -114,6 +114,16 @@ func (ms MilvusSpec) GetMilvusVersionByImage() (*semver.Version, error) { return provisioner.GetSemanticVersion(splited[1]) } +func (ms *MilvusSpec) GetPersistenceConfig() *Persistence { + switch ms.Dep.MsgStreamType { + case MsgStreamTypeRocksMQ: + return &ms.Dep.RocksMQ.Persistence + case MsgStreamTypeNatsMQ: + return &ms.Dep.NatsMQ.Persistence + } + return nil +} + // MilvusMode defines the mode of Milvus deployment type MilvusMode string diff --git a/apis/milvus.io/v1beta1/milvus_types_test.go b/apis/milvus.io/v1beta1/milvus_types_test.go index 231766ee..f87ef12e 100644 --- a/apis/milvus.io/v1beta1/milvus_types_test.go +++ b/apis/milvus.io/v1beta1/milvus_types_test.go @@ -216,3 +216,16 @@ func TestGetMilvusVersionByGlobalImage(t *testing.T) { _, err = m.Spec.GetMilvusVersionByImage() assert.Error(t, err) } + +func TestGetPersistenceConfig(t *testing.T) { + m := Milvus{} + m.Spec.Dep.MsgStreamType = MsgStreamTypePulsar + assert.Nil(t, m.Spec.GetPersistenceConfig()) + + m.Spec.Dep.MsgStreamType = MsgStreamTypeRocksMQ + m.Spec.Dep.NatsMQ.Persistence.Enabled = true + assert.Same(t, &m.Spec.Dep.RocksMQ.Persistence, m.Spec.GetPersistenceConfig()) + + m.Spec.Dep.MsgStreamType = MsgStreamTypeNatsMQ + assert.Same(t, &m.Spec.Dep.NatsMQ.Persistence, m.Spec.GetPersistenceConfig()) +} diff --git a/apis/milvus.io/v1beta1/zz_generated.deepcopy.go b/apis/milvus.io/v1beta1/zz_generated.deepcopy.go index f7149f22..69bc8aec 100644 --- a/apis/milvus.io/v1beta1/zz_generated.deepcopy.go +++ b/apis/milvus.io/v1beta1/zz_generated.deepcopy.go @@ -209,6 +209,22 @@ func (in *Milvus) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MilvusBuildInMQ) DeepCopyInto(out *MilvusBuildInMQ) { + *out = *in + in.Persistence.DeepCopyInto(&out.Persistence) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MilvusBuildInMQ. +func (in *MilvusBuildInMQ) DeepCopy() *MilvusBuildInMQ { + if in == nil { + return nil + } + out := new(MilvusBuildInMQ) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MilvusComponents) DeepCopyInto(out *MilvusComponents) { *out = *in @@ -338,6 +354,7 @@ func (in *MilvusDependencies) DeepCopyInto(out *MilvusDependencies) { in.Pulsar.DeepCopyInto(&out.Pulsar) in.Kafka.DeepCopyInto(&out.Kafka) in.RocksMQ.DeepCopyInto(&out.RocksMQ) + in.NatsMQ.DeepCopyInto(&out.NatsMQ) in.Storage.DeepCopyInto(&out.Storage) } @@ -618,22 +635,6 @@ func (in *MilvusReplicas) DeepCopy() *MilvusReplicas { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *MilvusRocksMQ) DeepCopyInto(out *MilvusRocksMQ) { - *out = *in - in.Persistence.DeepCopyInto(&out.Persistence) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MilvusRocksMQ. -func (in *MilvusRocksMQ) DeepCopy() *MilvusRocksMQ { - if in == nil { - return nil - } - out := new(MilvusRocksMQ) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MilvusRootCoord) DeepCopyInto(out *MilvusRootCoord) { *out = *in diff --git a/charts/milvus-operator/templates/crds.yaml b/charts/milvus-operator/templates/crds.yaml index 6bd47363..5641b289 100644 --- a/charts/milvus-operator/templates/crds.yaml +++ b/charts/milvus-operator/templates/crds.yaml @@ -6011,8 +6011,101 @@ spec: - pulsar - kafka - rocksmq + - natsmq - "" type: string + natsmq: + properties: + persistence: + properties: + enabled: + type: boolean + persistentVolumeClaim: + properties: + annotations: + additionalProperties: + type: string + type: object + existingClaim: + type: string + labels: + additionalProperties: + type: string + type: object + spec: + properties: + accessModes: + items: + type: string + type: array + dataSource: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + resources: + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + selector: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + storageClassName: + type: string + volumeMode: + type: string + volumeName: + type: string + type: object + type: object + pvcDeletion: + type: boolean + type: object + required: + - persistence + type: object pulsar: properties: endpoint: @@ -6778,8 +6871,101 @@ spec: - pulsar - kafka - rocksmq + - natsmq - "" type: string + natsmq: + properties: + persistence: + properties: + enabled: + type: boolean + persistentVolumeClaim: + properties: + annotations: + additionalProperties: + type: string + type: object + existingClaim: + type: string + labels: + additionalProperties: + type: string + type: object + spec: + properties: + accessModes: + items: + type: string + type: array + dataSource: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + resources: + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + selector: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + storageClassName: + type: string + volumeMode: + type: string + volumeName: + type: string + type: object + type: object + pvcDeletion: + type: boolean + type: object + required: + - persistence + type: object pulsar: properties: endpoint: @@ -13370,8 +13556,101 @@ spec: - pulsar - kafka - rocksmq + - natsmq - "" type: string + natsmq: + properties: + persistence: + properties: + enabled: + type: boolean + persistentVolumeClaim: + properties: + annotations: + additionalProperties: + type: string + type: object + existingClaim: + type: string + labels: + additionalProperties: + type: string + type: object + spec: + properties: + accessModes: + items: + type: string + type: array + dataSource: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + resources: + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + selector: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + storageClassName: + type: string + volumeMode: + type: string + volumeName: + type: string + type: object + type: object + pvcDeletion: + type: boolean + type: object + required: + - persistence + type: object pulsar: properties: endpoint: diff --git a/config/crd/bases/milvus.io_milvusclusters.yaml b/config/crd/bases/milvus.io_milvusclusters.yaml index ba232624..4b31fcf9 100644 --- a/config/crd/bases/milvus.io_milvusclusters.yaml +++ b/config/crd/bases/milvus.io_milvusclusters.yaml @@ -6010,8 +6010,101 @@ spec: - pulsar - kafka - rocksmq + - natsmq - "" type: string + natsmq: + properties: + persistence: + properties: + enabled: + type: boolean + persistentVolumeClaim: + properties: + annotations: + additionalProperties: + type: string + type: object + existingClaim: + type: string + labels: + additionalProperties: + type: string + type: object + spec: + properties: + accessModes: + items: + type: string + type: array + dataSource: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + resources: + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + selector: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + storageClassName: + type: string + volumeMode: + type: string + volumeName: + type: string + type: object + type: object + pvcDeletion: + type: boolean + type: object + required: + - persistence + type: object pulsar: properties: endpoint: diff --git a/config/crd/bases/milvus.io_milvuses.yaml b/config/crd/bases/milvus.io_milvuses.yaml index 76204c3d..08402595 100644 --- a/config/crd/bases/milvus.io_milvuses.yaml +++ b/config/crd/bases/milvus.io_milvuses.yaml @@ -446,8 +446,101 @@ spec: - pulsar - kafka - rocksmq + - natsmq - "" type: string + natsmq: + properties: + persistence: + properties: + enabled: + type: boolean + persistentVolumeClaim: + properties: + annotations: + additionalProperties: + type: string + type: object + existingClaim: + type: string + labels: + additionalProperties: + type: string + type: object + spec: + properties: + accessModes: + items: + type: string + type: array + dataSource: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + resources: + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + selector: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + storageClassName: + type: string + volumeMode: + type: string + volumeName: + type: string + type: object + type: object + pvcDeletion: + type: boolean + type: object + required: + - persistence + type: object pulsar: properties: endpoint: @@ -7038,8 +7131,101 @@ spec: - pulsar - kafka - rocksmq + - natsmq - "" type: string + natsmq: + properties: + persistence: + properties: + enabled: + type: boolean + persistentVolumeClaim: + properties: + annotations: + additionalProperties: + type: string + type: object + existingClaim: + type: string + labels: + additionalProperties: + type: string + type: object + spec: + properties: + accessModes: + items: + type: string + type: array + dataSource: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + resources: + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + selector: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + storageClassName: + type: string + volumeMode: + type: string + volumeName: + type: string + type: object + type: object + pvcDeletion: + type: boolean + type: object + required: + - persistence + type: object pulsar: properties: endpoint: diff --git a/deploy/manifests/deployment.yaml b/deploy/manifests/deployment.yaml index a7827adb..124d1527 100644 --- a/deploy/manifests/deployment.yaml +++ b/deploy/manifests/deployment.yaml @@ -6042,8 +6042,101 @@ spec: - pulsar - kafka - rocksmq + - natsmq - "" type: string + natsmq: + properties: + persistence: + properties: + enabled: + type: boolean + persistentVolumeClaim: + properties: + annotations: + additionalProperties: + type: string + type: object + existingClaim: + type: string + labels: + additionalProperties: + type: string + type: object + spec: + properties: + accessModes: + items: + type: string + type: array + dataSource: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + resources: + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + selector: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + storageClassName: + type: string + volumeMode: + type: string + volumeName: + type: string + type: object + type: object + pvcDeletion: + type: boolean + type: object + required: + - persistence + type: object pulsar: properties: endpoint: @@ -6810,8 +6903,101 @@ spec: - pulsar - kafka - rocksmq + - natsmq - "" type: string + natsmq: + properties: + persistence: + properties: + enabled: + type: boolean + persistentVolumeClaim: + properties: + annotations: + additionalProperties: + type: string + type: object + existingClaim: + type: string + labels: + additionalProperties: + type: string + type: object + spec: + properties: + accessModes: + items: + type: string + type: array + dataSource: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + resources: + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + selector: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + storageClassName: + type: string + volumeMode: + type: string + volumeName: + type: string + type: object + type: object + pvcDeletion: + type: boolean + type: object + required: + - persistence + type: object pulsar: properties: endpoint: @@ -13402,8 +13588,101 @@ spec: - pulsar - kafka - rocksmq + - natsmq - "" type: string + natsmq: + properties: + persistence: + properties: + enabled: + type: boolean + persistentVolumeClaim: + properties: + annotations: + additionalProperties: + type: string + type: object + existingClaim: + type: string + labels: + additionalProperties: + type: string + type: object + spec: + properties: + accessModes: + items: + type: string + type: array + dataSource: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + resources: + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + selector: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + storageClassName: + type: string + volumeMode: + type: string + volumeName: + type: string + type: object + type: object + pvcDeletion: + type: boolean + type: object + required: + - persistence + type: object pulsar: properties: endpoint: diff --git a/pkg/controllers/configmaps.go b/pkg/controllers/configmaps.go index dabc57da..e18190d4 100644 --- a/pkg/controllers/configmaps.go +++ b/pkg/controllers/configmaps.go @@ -72,7 +72,14 @@ func (r *MilvusReconciler) updateConfigMap(ctx context.Context, mc v1beta1.Milvu // delete other mq config to make milvus use rocksmq delete(conf, "pulsar") delete(conf, "kafka") + default: + // we use mq.type to handle it } + _, found := conf["mq"] + if conf["mq"] == nil || !found { + conf["mq"] = map[string]interface{}{} + } + conf["mq"].(map[string]interface{})["type"] = mc.Spec.Dep.MsgStreamType conf[util.MqTypeConfigKey] = mc.Spec.Dep.MsgStreamType diff --git a/pkg/controllers/dependencies.go b/pkg/controllers/dependencies.go index e1eec5d6..6166e993 100644 --- a/pkg/controllers/dependencies.go +++ b/pkg/controllers/dependencies.go @@ -158,7 +158,7 @@ func (r *MilvusReconciler) ReconcileMsgStream(ctx context.Context, mc v1beta1.Mi switch mc.Spec.Dep.MsgStreamType { case v1beta1.MsgStreamTypeKafka: return r.ReconcileKafka(ctx, mc) - case v1beta1.MsgStreamTypeRocksMQ: + case v1beta1.MsgStreamTypeRocksMQ, v1beta1.MsgStreamTypeNatsMQ: // built in, do nothing return nil default: diff --git a/pkg/controllers/deployment_updater.go b/pkg/controllers/deployment_updater.go index 5a16189f..1cca7c59 100644 --- a/pkg/controllers/deployment_updater.go +++ b/pkg/controllers/deployment_updater.go @@ -230,10 +230,7 @@ func newMilvusDeploymentUpdater(m v1beta1.Milvus, scheme *runtime.Scheme, compon } func (m milvusDeploymentUpdater) GetPersistenceConfig() *v1beta1.Persistence { - if m.Milvus.Spec.Dep.RocksMQ.Persistence.Enabled { - return &m.Milvus.Spec.Dep.RocksMQ.Persistence - } - return nil + return m.Milvus.Spec.GetPersistenceConfig() } func (m milvusDeploymentUpdater) GetIntanceName() string { diff --git a/pkg/controllers/pvc.go b/pkg/controllers/pvc.go index 576175df..7af7fcf5 100644 --- a/pkg/controllers/pvc.go +++ b/pkg/controllers/pvc.go @@ -18,7 +18,10 @@ func getPVCNameByInstName(instName string) string { } func (r *MilvusReconciler) ReconcilePVCs(ctx context.Context, mil v1beta1.Milvus) error { - persistence := mil.Spec.Dep.RocksMQ.Persistence + persistence := mil.Spec.GetPersistenceConfig() + if persistence == nil { + return nil + } needPVC := persistence.Enabled && len(persistence.PersistentVolumeClaim.ExistingClaim) < 1 if !needPVC { return nil diff --git a/test/min-mc-feature.yaml b/test/min-mc-feature.yaml index 1cb575a6..08ae7318 100644 --- a/test/min-mc-feature.yaml +++ b/test/min-mc-feature.yaml @@ -14,7 +14,7 @@ metadata: spec: mode: "cluster" components: - image: milvusdb/milvus:master-20230103-72184eae + image: milvusdb/milvus:master-20230814-ae780e03 proxy: ingress: hosts: ["mc-sit.milvus.io"] diff --git a/test/min-milvus-feature.yaml b/test/min-milvus-feature.yaml index 2a8a011e..6e47b4e1 100644 --- a/test/min-milvus-feature.yaml +++ b/test/min-milvus-feature.yaml @@ -26,12 +26,10 @@ spec: pvcDeletion: true values: replicaCount: 1 - msgStreamType: kafka - kafka: - inCluster: - deletionPolicy: Delete - pvcDeletion: true - values: {} + msgStreamType: natsmq + natsmq: + persistence: + enabled: true storage: inCluster: deletionPolicy: Delete diff --git a/tool/merge/main.go b/tool/merge/main.go index bd726f0c..97751b00 100644 --- a/tool/merge/main.go +++ b/tool/merge/main.go @@ -13,6 +13,7 @@ import ( var mqConfigsToDelete = map[string]bool{ "kafka": true, "rocksmq": true, + "natsmq": true, "pulsar": true, }