Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxuzhonghu committed Apr 7, 2024
1 parent 92a9480 commit 9e46a77
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pilot/pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ func (s *Server) initRegistryEventHandlers() {
log.Debugf("Handle event %s for configuration %s", event, curr.Key())
// For update events, trigger push only if spec has changed.
if event == model.EventUpdate && !needsPush(prev, curr) {
log.Debugf("skipping push for %s as spec has not changed", prev.Key())
log.Debugf("skipping push for %s as spec has not changed", curr.Key())
return
}
pushReq := &model.PushRequest{
Expand Down
55 changes: 30 additions & 25 deletions pilot/pkg/config/kube/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"sort"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
knetworking "k8s.io/api/networking/v1"
Expand All @@ -40,6 +41,7 @@ import (
"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/queue"
"istio.io/istio/pkg/util/sets"
)

Expand Down Expand Up @@ -77,7 +79,8 @@ type controller struct {
meshWatcher mesh.Holder
domainSuffix string

queue controllers.Queue
queue queue.Instance

virtualServiceHandlers []model.EventHandler
gatewayHandlers []model.EventHandler

Expand Down Expand Up @@ -110,10 +113,12 @@ func NewController(client kube.Client, meshWatcher mesh.Holder,
classes: classes,
services: services,
}
c.queue = controllers.NewQueue("ingress",
controllers.WithReconciler(c.onEvent),
controllers.WithMaxAttempts(5))
c.ingress.AddEventHandler(controllers.ObjectHandler(c.queue.AddObject))
c.queue = queue.NewQueueWithID(time.Second, "ingress")
c.ingress.AddEventHandler(controllers.FromEventHandler(func(o controllers.Event) {
c.queue.Push(func() error {
return c.onIngressEvent(o)
})
}))

// We watch service changes to detect service port number change to trigger
// re-convert ingress to new-vs.
Expand Down Expand Up @@ -168,48 +173,43 @@ func (c *controller) shouldProcessIngressUpdate(ing *knetworking.Ingress) bool {
return preProcessed
}

func (c *controller) onEvent(item types.NamespacedName) error {
event := model.EventUpdate
ing := c.ingress.Get(item.Name, item.Namespace)
if ing == nil {
event = model.EventDelete
func (c *controller) onIngressEvent(event controllers.Event) error {
ingress := event.Latest().(*knetworking.Ingress)
if event.Event == controllers.EventDelete {
c.mutex.Lock()
ing = c.ingresses[item]
delete(c.ingresses, item)
delete(c.ingresses, config.NamespacedName(ingress))
c.mutex.Unlock()
if ing == nil {
// It was a delete and we didn't have an existing known ingress, no action
return nil
}
}

// we should check need process only when event is not delete,
// if it is delete event, and previously processed, we need to process too.
if event != model.EventDelete {
shouldProcess := c.shouldProcessIngressUpdate(ing)
if event.Event != controllers.EventDelete {
shouldProcess := c.shouldProcessIngressUpdate(ingress)
if !shouldProcess {
return nil
}
}

vsmetadata := config.Meta{
Name: item.Name + "-" + "virtualservice",
Namespace: item.Namespace,
Name: ingress.Name + "-" + "virtualservice",
Namespace: ingress.Namespace,
GroupVersionKind: gvk.VirtualService,
Annotations: map[string]string{constants.InternalRouteSemantics: constants.RouteSemanticsIngress},
}
gatewaymetadata := config.Meta{
Name: item.Name + "-" + "gateway",
Namespace: item.Namespace,
Name: ingress.Name + "-" + "gateway",
Namespace: ingress.Namespace,
GroupVersionKind: gvk.Gateway,
Annotations: map[string]string{constants.InternalRouteSemantics: constants.RouteSemanticsIngress},
}

// Trigger updates for Gateway and VirtualService
// TODO: we could be smarter here and only trigger when real changes were found
for _, f := range c.virtualServiceHandlers {
f(config.Config{Meta: vsmetadata}, config.Config{Meta: vsmetadata}, event)
f(config.Config{Meta: vsmetadata}, config.Config{Meta: vsmetadata}, model.Event(event.Event))
}
for _, f := range c.gatewayHandlers {
f(config.Config{Meta: gatewaymetadata}, config.Config{Meta: gatewaymetadata}, event)
f(config.Config{Meta: gatewaymetadata}, config.Config{Meta: gatewaymetadata}, model.Event(event.Event))
}

return nil
Expand All @@ -236,7 +236,12 @@ func (c *controller) onServiceEvent(input any) {
for _, ingress := range c.ingress.List(curSvc.GetNamespace(), klabels.Everything()) {
referredSvcSet := extractServicesByPortNameType(ingress)
if referredSvcSet.Contains(namespacedName) {
c.queue.AddObject(ingress)
c.queue.Push(func() error {
return c.onIngressEvent(controllers.Event{
Event: controllers.EventUpdate,
New: ingress,
})
})
}
}
}
Expand Down

0 comments on commit 9e46a77

Please sign in to comment.