From a897ef4f4c462bec6caf88471fa142c19b1aaa57 Mon Sep 17 00:00:00 2001 From: ElonMuskkkkkk Date: Thu, 10 Aug 2023 11:59:53 +0800 Subject: [PATCH] modify some codes --- controllers/licenseissuer/.env | 8 +- controllers/licenseissuer/deploy/Kubefile | 3 + .../deploy/manifests/customconfig.yaml.tmpl | 1 + .../deploy/manifests/deploy.yaml | 141 +----------------- .../internal/controller/license_controller.go | 13 +- .../internal/controller/util/const.go | 10 +- .../internal/controller/util/datasync.go | 2 +- .../internal/controller/util/init.go | 4 +- .../internal/controller/util/notice.go | 73 ++++----- .../internal/controller/util/options.go | 4 + controllers/pkg/notification/builder.go | 69 +-------- controllers/pkg/notification/manager.go | 80 +++------- controllers/pkg/notification/pool.go | 54 +++++++ 13 files changed, 147 insertions(+), 315 deletions(-) create mode 100644 controllers/pkg/notification/pool.go diff --git a/controllers/licenseissuer/.env b/controllers/licenseissuer/.env index 5b9ae184590..091390edcbd 100644 --- a/controllers/licenseissuer/.env +++ b/controllers/licenseissuer/.env @@ -1,5 +1,5 @@ # set the environment variables when local testing -MONITOR= "true" -CAN_CONNECT_TO_EXTERNAL_NETWORK= "true" -MONGO_URI= "mongodb://root:fd4m9rg6@localhost:27017" -PASSWORD_SALT= "Ym1uazlranZscmIybmRxZmNreXQ4MzU1dWVtN3F3OWtpZDFhNGRvbmprbnNzMnFmNThuendrNDJtdzAyaG1wZw==" \ No newline at end of file +MONITOR= "" +CAN_CONNECT_TO_EXTERNAL_NETWORK= "" +MONGO_URI= "" +PASSWORD_SALT= "" \ No newline at end of file diff --git a/controllers/licenseissuer/deploy/Kubefile b/controllers/licenseissuer/deploy/Kubefile index 2bca9865be5..10f3d37a4b7 100644 --- a/controllers/licenseissuer/deploy/Kubefile +++ b/controllers/licenseissuer/deploy/Kubefile @@ -9,10 +9,13 @@ ENV canConnectToExternalNetwork "true" ENV enableMonitor "true" ENV MongoURI "" ENV PasswordSalt "" +ENV Namespace "sealos-system" ENV CollectorURL "https://license.sealos.io/collector" ENV NotificationURL "https://license.sealos.io/notify" ENV RegisterURL "https://license.sealos.io/register" ENV CloudSyncURL "https://license.sealos.io/datasync" ENV LicenseMonitorURL "https://license.sealos.io/license" + + CMD ["chmod +x manifests/setup.sh && bash manifests/setup.sh"] diff --git a/controllers/licenseissuer/deploy/manifests/customconfig.yaml.tmpl b/controllers/licenseissuer/deploy/manifests/customconfig.yaml.tmpl index 19c961b3df4..a6d9edeca76 100644 --- a/controllers/licenseissuer/deploy/manifests/customconfig.yaml.tmpl +++ b/controllers/licenseissuer/deploy/manifests/customconfig.yaml.tmpl @@ -8,6 +8,7 @@ stringData: isMonitor: "{{ .enableMonitor }}" MongoURI: "{{ .MongoURI }}" PasswordSalt: "{{ .PasswordSalt }}" + Namespace: "{{ .Namespace }}" --- apiVersion: v1 data: diff --git a/controllers/licenseissuer/deploy/manifests/deploy.yaml b/controllers/licenseissuer/deploy/manifests/deploy.yaml index 3f0efc0147a..19c25a06df6 100644 --- a/controllers/licenseissuer/deploy/manifests/deploy.yaml +++ b/controllers/licenseissuer/deploy/manifests/deploy.yaml @@ -221,32 +221,6 @@ rules: - patch - update - watch -- apiGroups: - - "" - resources: - - namespaces - verbs: - - create - - get - - list - - update - - watch -- apiGroups: - - "" - resources: - - nodes - verbs: - - get - - list - - watch -- apiGroups: - - "" - resources: - - persistentvolumes - verbs: - - get - - list - - watch - apiGroups: - "" resources: @@ -259,84 +233,6 @@ rules: - patch - update - watch -- apiGroups: - - infostream.sealos.io - resources: - - cloudsyncs - verbs: - - create - - delete - - get - - list - - patch - - update - - watch -- apiGroups: - - infostream.sealos.io - resources: - - cloudsyncs/finalizers - verbs: - - update -- apiGroups: - - infostream.sealos.io - resources: - - cloudsyncs/status - verbs: - - get - - patch - - update -- apiGroups: - - infostream.sealos.io - resources: - - collectors - verbs: - - create - - delete - - get - - list - - patch - - update - - watch -- apiGroups: - - infostream.sealos.io - resources: - - collectors/finalizers - verbs: - - update -- apiGroups: - - infostream.sealos.io - resources: - - collectors/status - verbs: - - get - - patch - - update -- apiGroups: - - infostream.sealos.io - resources: - - launchers - verbs: - - create - - delete - - get - - list - - patch - - update - - watch -- apiGroups: - - infostream.sealos.io - resources: - - launchers/finalizers - verbs: - - update -- apiGroups: - - infostream.sealos.io - resources: - - launchers/status - verbs: - - get - - patch - - update - apiGroups: - infostream.sealos.io resources: @@ -363,32 +259,6 @@ rules: - get - patch - update -- apiGroups: - - infostream.sealos.io - resources: - - notifications - verbs: - - create - - delete - - get - - list - - patch - - update - - watch -- apiGroups: - - infostream.sealos.io - resources: - - notifications/finalizers - verbs: - - update -- apiGroups: - - infostream.sealos.io - resources: - - notifications/status - verbs: - - get - - patch - - update - apiGroups: - notification.sealos.io resources: @@ -583,7 +453,7 @@ spec: resources: limits: cpu: 500m - memory: 64Mi + memory: 128Mi requests: cpu: 5m memory: 64Mi @@ -619,7 +489,12 @@ spec: secretKeyRef: key: PasswordSalt name: licenseissuer-env - image: registry.cn-hangzhou.aliyuncs.com/fckc/cloud-controller:latest + - name: NAMESPACE + valueFrom: + secretKeyRef: + key: Namespace + name: licenseissuer-env + image: ghcr.io/labring/sealos-licenseissuer-controller:latest livenessProbe: httpGet: path: /healthz @@ -639,7 +514,7 @@ spec: memory: 1024Mi requests: cpu: 10m - memory: 64Mi + memory: 512Mi securityContext: allowPrivilegeEscalation: false capabilities: diff --git a/controllers/licenseissuer/internal/controller/license_controller.go b/controllers/licenseissuer/internal/controller/license_controller.go index 6607269815c..932e50b98e4 100644 --- a/controllers/licenseissuer/internal/controller/license_controller.go +++ b/controllers/licenseissuer/internal/controller/license_controller.go @@ -73,17 +73,17 @@ func (r *LicenseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct // for notification nq := &ntf.NoticeEventQueue{} - nm := ntf.NewNotificationManager(ctx, r.Client) + nm := ntf.NewNotificationManager(ctx, r.Client, r.logger, 1, 1) nb := (&ntf.Builder{}).WithLevel(notificationv1.High). WithTitle(util.LicenseNoticeTitle).WithFrom(util.Sealos). WithType(ntf.General) - receiver := ntf.NewReceiver(ctx, r.Client).SetReceiver(req.Namespace) + receiver := ntf.NewReceiver(ctx, r.Client).AddReceiver(req.Namespace) reader := &util.Reader{} // get license - + namespace := util.GetOptions().GetEnvOptions().Namespace reader.Add(&r.license, types.NamespacedName{Namespace: req.Namespace, Name: util.LicenseName}) - reader.Add(&r.configMap, types.NamespacedName{Namespace: util.SealosNamespace, Name: util.LicenseHistory}) + reader.Add(&r.configMap, types.NamespacedName{Namespace: namespace, Name: util.LicenseHistory}) if err := reader.Read(ctx, r.Client); err != nil { r.logger.Error(err, "failed to read resources...") @@ -155,7 +155,10 @@ func (r *LicenseReconciler) Authorize(ctx context.Context) (string, error) { return message, errors.New("invalid license") } // get account - id := types.NamespacedName{Namespace: util.SealosNamespace, Name: r.license.Spec.UID} + id := types.NamespacedName{ + Namespace: util.GetOptions().GetEnvOptions().Namespace, + Name: r.license.Spec.UID, + } err := r.Client.Get(ctx, id, &r.account) if err != nil { r.logger.Error(err, "failed to get account") diff --git a/controllers/licenseissuer/internal/controller/util/const.go b/controllers/licenseissuer/internal/controller/util/const.go index 8dcfa906ead..4d6d766ce72 100644 --- a/controllers/licenseissuer/internal/controller/util/const.go +++ b/controllers/licenseissuer/internal/controller/util/const.go @@ -17,11 +17,11 @@ limitations under the License. package util const ( - SealosNamespace = "sealos-system" - ClusterInfo = "cluster-info" - URLConfig = "url-config" - LicenseHistory = "license-history" - LicenseName = "license" + // SealosNamespace = "sealos-system" + ClusterInfo = "cluster-info" + URLConfig = "url-config" + LicenseHistory = "license-history" + LicenseName = "license" // add more resource name here ) diff --git a/controllers/licenseissuer/internal/controller/util/datasync.go b/controllers/licenseissuer/internal/controller/util/datasync.go index 530d841af10..65bd0de4824 100644 --- a/controllers/licenseissuer/internal/controller/util/datasync.go +++ b/controllers/licenseissuer/internal/controller/util/datasync.go @@ -64,7 +64,7 @@ func (d *datasycn) sync(instance *TaskInstance) error { } // update configmap err = d.updateConfigMap(instance, syncResponse, types.NamespacedName{ - Namespace: SealosNamespace, + Namespace: GetOptions().GetEnvOptions().Namespace, Name: URLConfig, }) diff --git a/controllers/licenseissuer/internal/controller/util/init.go b/controllers/licenseissuer/internal/controller/util/init.go index 47051b0f26e..793151ba046 100644 --- a/controllers/licenseissuer/internal/controller/util/init.go +++ b/controllers/licenseissuer/internal/controller/util/init.go @@ -142,7 +142,7 @@ func (t initTask) checkRegister(instance *TaskInstance) (bool, error) { info := &corev1.Secret{} err := instance.Get(instance.ctx, types.NamespacedName{ Name: ClusterInfo, - Namespace: SealosNamespace, + Namespace: GetOptions().GetEnvOptions().Namespace, }, info) if err != nil && apierrors.IsNotFound(err) { return false, nil @@ -171,7 +171,7 @@ func createClusterInfo() *corev1.Secret { uuid := uuid.New().String() secret := &corev1.Secret{} secret.Name = ClusterInfo - secret.Namespace = SealosNamespace + secret.Namespace = GetOptions().GetEnvOptions().Namespace secret.Data = map[string][]byte{ "uuid": []byte(uuid), } diff --git a/controllers/licenseissuer/internal/controller/util/notice.go b/controllers/licenseissuer/internal/controller/util/notice.go index 18efaa6358c..a4450f09de7 100644 --- a/controllers/licenseissuer/internal/controller/util/notice.go +++ b/controllers/licenseissuer/internal/controller/util/notice.go @@ -19,6 +19,7 @@ package util import ( "context" "fmt" + "strings" "time" notificationv1 "github.com/labring/sealos/controllers/common/notification/api/v1" @@ -59,14 +60,9 @@ func (nr *NotificationRequest) setTimestamp(timestamp int64) *NotificationReques func (n *notice) noticeWork(instance *TaskInstance) error { // init - receiver := ntf.Receiver{ - Context: instance.ctx, - Client: instance.Client, - } - manager := ntf.NotificationManager{ - Ctx: instance.ctx, - Client: instance.Client, - } + receiver := ntf.NewReceiver(instance.ctx, instance.Client) + manager := ntf.NewNotificationManager(instance.ctx, instance.Client, + instance.logger, maxBatchSize, maxChannelSize) // get uid and url-map uid, urlMap, err := GetUIDURL(instance.ctx, instance.Client) if err != nil { @@ -91,14 +87,14 @@ func (n *notice) noticeWork(instance *TaskInstance) error { } // get receivers - err = receiver.Cache(&corev1.NamespaceList{}) + receiver.AddReceivers(n.getUserNamespace(instance, filter)) if err != nil { instance.logger.Error(err, "failed to cache namespace") return err } - manager.Load(&receiver, events).Run() + manager.Load(receiver, events).Run() return nil } @@ -118,7 +114,7 @@ func GetURL(ctx context.Context, client client.Client) (map[string]string, error urlConfigMap := &corev1.ConfigMap{} id := types.NamespacedName{ Name: URLConfig, - Namespace: SealosNamespace, + Namespace: GetOptions().GetEnvOptions().Namespace, } // get url-config from k8s err := client.Get(ctx, id, urlConfigMap) @@ -137,7 +133,7 @@ func GetUID(ctx context.Context, client client.Client) (string, error) { info := &corev1.Secret{} err := client.Get(ctx, types.NamespacedName{ Name: ClusterInfo, - Namespace: SealosNamespace, + Namespace: GetOptions().GetEnvOptions().Namespace, }, info) if err != nil { return "", fmt.Errorf("failed to get cluster-info: %w", err) @@ -243,35 +239,24 @@ func (nc *noticeCleaner) getNotificationsExpired(instance *TaskInstance) ([]noti const maxBatchSize = 100 const maxChannelSize = 500 -// The Pool proviveds a pool of goroutines that can be used to perform work. -// type Pool struct { -// work chan func() -// wg sync.WaitGroup -// } - -// func NewPool(size int) *Pool { -// return &Pool{ -// work: make(chan func(), size), -// } -// } - -// func (p *Pool) Run(workerCount int) { -// p.wg.Add(workerCount) -// for i := 0; i < workerCount; i++ { -// go func() { -// for work := range p.work { -// work() -// } -// p.wg.Done() -// }() -// } -// } - -// func (p *Pool) Add(work func()) { -// p.work <- work -// } - -// func (p *Pool) Wait() { -// close(p.work) -// p.wg.Wait() -// } +type FilterFunc func(string) bool + +func (n *notice) getUserNamespace(instance *TaskInstance, opt FilterFunc) []string { + namespaceList := &corev1.NamespaceList{} + err := instance.List(instance.ctx, namespaceList) + if err != nil { + instance.logger.Error(err, "failed to list namespace") + return nil + } + var namespaces []string + for _, namespace := range namespaceList.Items { + if opt(namespace.Name) { + namespaces = append(namespaces, namespace.Name) + } + } + return nil +} + +func filter(ns string) bool { + return strings.HasPrefix(ns, ntf.GeneralPrefix) +} diff --git a/controllers/licenseissuer/internal/controller/util/options.go b/controllers/licenseissuer/internal/controller/util/options.go index c1802e210fb..3aef42fb0ad 100644 --- a/controllers/licenseissuer/internal/controller/util/options.go +++ b/controllers/licenseissuer/internal/controller/util/options.go @@ -117,6 +117,9 @@ type EnvOptions struct { // The SaltKey is used to encrypt the password for pre-registered users. SaltKey string + + // Namespace + Namespace string } func (eo *EnvOptions) initOptions() { @@ -124,6 +127,7 @@ func (eo *EnvOptions) initOptions() { eo.MonitorConfiguration = os.Getenv("MONITOR") eo.MongoURI = os.Getenv("MONGO_URI") eo.SaltKey = os.Getenv("PASSWORD_SALT") + eo.Namespace = os.Getenv("NAMESPACE") } type RunnableOptions struct { diff --git a/controllers/pkg/notification/builder.go b/controllers/pkg/notification/builder.go index 0ad44d34370..ff14ed885ed 100644 --- a/controllers/pkg/notification/builder.go +++ b/controllers/pkg/notification/builder.go @@ -21,16 +21,12 @@ import ( "crypto/rand" "encoding/base64" "encoding/binary" - "fmt" "io" "strconv" "strings" "time" - "github.com/go-logr/logr" v1 "github.com/labring/sealos/controllers/common/notification/api/v1" - corev1 "k8s.io/api/core/v1" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -45,25 +41,6 @@ const ( GeneralPrefix = "ns-" // general notification prefix ) -// Filter is a function that takes an object -// and returns true if the object should be included in the result. -type Filter func(obj client.Object) bool - -var filters map[string]Filter - -var logger logr.Logger - -func init() { - logger = ctrl.Log.WithName("Notice") - filters = make(map[string]Filter) - filters[string(General)] = func(obj client.Object) bool { - return strings.HasPrefix(obj.GetName(), GeneralPrefix) - } - filters[string(Admin)] = func(obj client.Object) bool { - return false - } -} - const ( idLength = 12 letterBytes = "abcdefghijklmnopqrstuvwxyz0123456789" @@ -73,8 +50,7 @@ const ( type Receiver struct { context.Context client.Client - UserNamespaces []string - AdminNamespaces []string + receivers []string } func NewReceiver(ctx context.Context, client client.Client) *Receiver { @@ -84,49 +60,16 @@ func NewReceiver(ctx context.Context, client client.Client) *Receiver { } } -func (rv *Receiver) SetReceiver(receiver string, kind ...Kind) *Receiver { - if len(kind) == 0 { - rv.UserNamespaces = append(rv.UserNamespaces, receiver) - return rv - } - switch kind[0] { - case General: - rv.UserNamespaces = append(rv.UserNamespaces, receiver) - case Admin: - rv.AdminNamespaces = append(rv.AdminNamespaces, receiver) - } +func (rv *Receiver) AddReceiver(receiver string) *Receiver { + rv.receivers = append(rv.receivers, receiver) return rv } // Cache of the NamespaceCache caches the namespaces in the cluster // categorized by filters. -func (rv *Receiver) Cache(obj client.ObjectList) error { - err := rv.Client.List(rv.Context, obj) - if err != nil { - logger.Error(err, "Failed to list namespaces") - return err - } - namespaces, ok := obj.(*corev1.NamespaceList) - if !ok { - logger.Error(err, "Failed to cast to NamespaceList") - return fmt.Errorf("failed to cast to NamespaceList: %w", err) - } - for _, ns := range namespaces.Items { - if filters[string(General)](&ns) { - rv.UserNamespaces = append(rv.UserNamespaces, ns.GetName()) - } - if filters[string(Admin)](&ns) { - rv.AdminNamespaces = append(rv.AdminNamespaces, ns.GetName()) - } - } - return nil -} - -// ReCache of the NamespaceCache caches the namespaces in the cluster -func (rv *Receiver) ReCache(obj client.ObjectList) error { - rv.UserNamespaces = []string{} - rv.AdminNamespaces = []string{} - return rv.Cache(obj) +func (rv *Receiver) AddReceivers(receivers []string) *Receiver { + rv.receivers = append(rv.receivers, receivers...) + return rv } // NotificationPackage is the struct that contains the notification information. diff --git a/controllers/pkg/notification/manager.go b/controllers/pkg/notification/manager.go index c204de3b0b3..8cca9ca4565 100644 --- a/controllers/pkg/notification/manager.go +++ b/controllers/pkg/notification/manager.go @@ -18,10 +18,11 @@ package notification import ( "context" - "sync" "time" + "github.com/go-logr/logr" v1 "github.com/labring/sealos/controllers/common/notification/api/v1" + "github.com/labring/sealos/pkg/utils/logger" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -59,28 +60,34 @@ const maxChannelSize = 500 // 5. allow users to focus solely on important tasks. type NotificationManager struct { - Ctx context.Context - Client client.Client - NotificationQueue []v1.Notification + ctx context.Context + client client.Client + logger logr.Logger + batchSize int + channelSize int + queue []v1.Notification } -func NewNotificationManager(ctx context.Context, client client.Client) *NotificationManager { +func NewNotificationManager(ctx context.Context, client client.Client, + logger logr.Logger, batchSize, channelSize int) *NotificationManager { return &NotificationManager{ - Ctx: ctx, - Client: client, - NotificationQueue: []v1.Notification{}, + ctx: ctx, + client: client, + logger: logger, + batchSize: batchSize, + channelSize: channelSize, } } // Run of the NotificationManager runs the notification manager. // It writes the notifications in batches func (nm *NotificationManager) Run() { - pool := NewPool(maxBatchSize) - pool.Run(maxChannelSize) - for _, notification := range nm.NotificationQueue { + pool := NewPool(nm.batchSize) + pool.Run(nm.channelSize) + for _, notification := range nm.queue { notification := notification pool.Add(func() { - err := write(nm.Ctx, nm.Client, ¬ification) + err := write(nm.ctx, nm.client, ¬ification) if err != nil { logger.Error(err, "Failed to Do Notification Write Opt") } @@ -91,23 +98,15 @@ func (nm *NotificationManager) Run() { func (nm *NotificationManager) Load(receivers *Receiver, events []Event) *NotificationManager { for _, event := range events { - switch event.Kind { - case General: - nm.NotificationQueue = loadNotification(receivers.UserNamespaces, - event, nm.NotificationQueue) - case Admin: - nm.NotificationQueue = loadNotification(receivers.AdminNamespaces, - event, nm.NotificationQueue) - } + nm.loadNotification(receivers.receivers, event) } return nm } -func loadNotification(receivers []string, event Event, queue []v1.Notification) []v1.Notification { +func (nm *NotificationManager) loadNotification(receivers []string, event Event) { for _, receiver := range receivers { - queue = append(queue, newNotification(receiver, event)) + nm.queue = append(nm.queue, newNotification(receiver, event)) } - return queue } func write(ctx context.Context, client client.Client, obj client.Object, opts ...client.CreateOption) error { @@ -133,38 +132,3 @@ func newNotification(receiver string, event Event) v1.Notification { }, } } - -type Pool struct { - wg sync.WaitGroup - work chan func() -} - -func NewPool(size int) *Pool { - p := &Pool{ - work: make(chan func(), size), - } - return p -} - -func (p *Pool) Add(f func()) { - p.work <- func() { - f() - } -} - -func (p *Pool) Wait() { - close(p.work) - p.wg.Wait() -} - -func (p *Pool) Run(size int) { - p.wg.Add(size) - for i := 0; i < size; i++ { - go func() { - for f := range p.work { - f() - } - p.wg.Done() - }() - } -} diff --git a/controllers/pkg/notification/pool.go b/controllers/pkg/notification/pool.go new file mode 100644 index 00000000000..5517058fb62 --- /dev/null +++ b/controllers/pkg/notification/pool.go @@ -0,0 +1,54 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package notification + +import "sync" + +type Pool struct { + wg sync.WaitGroup + work chan func() +} + +func NewPool(size int) *Pool { + p := &Pool{ + work: make(chan func(), size), + } + return p +} + +func (p *Pool) Add(f func()) { + p.work <- func() { + f() + } +} + +func (p *Pool) Wait() { + close(p.work) + p.wg.Wait() +} + +func (p *Pool) Run(size int) { + p.wg.Add(size) + for i := 0; i < size; i++ { + go func() { + for f := range p.work { + f() + } + p.wg.Done() + }() + } +}