Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions controllers/aga/eventhandlers/resource_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package eventhandlers

import (
"context"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
)

// NewEnqueueRequestsForResourceEvent creates a new handler for generic resource events
func NewEnqueueRequestsForResourceEvent(
resourceType aga.ResourceType,
referenceTracker *aga.ReferenceTracker,
logger logr.Logger,
) handler.EventHandler {
return &enqueueRequestsForResourceEvent{
resourceType: resourceType,
referenceTracker: referenceTracker,
logger: logger,
}
}

// enqueueRequestsForResourceEvent handles resource events and enqueues reconcile requests for GlobalAccelerators
// that reference the resource
type enqueueRequestsForResourceEvent struct {
resourceType aga.ResourceType
referenceTracker *aga.ReferenceTracker
logger logr.Logger
}

// The following methods implement handler.TypedEventHandler interface

// Create handles Create events with the typed API
func (h *enqueueRequestsForResourceEvent) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.handleResource(ctx, evt.Object, "created", queue)
}

// Update handles Update events with the typed API
func (h *enqueueRequestsForResourceEvent) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.handleResource(ctx, evt.ObjectNew, "updated", queue)
}

// Delete handles Delete events with the typed API
func (h *enqueueRequestsForResourceEvent) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.handleResource(ctx, evt.Object, "deleted", queue)
}

// Generic handles Generic events with the typed API
func (h *enqueueRequestsForResourceEvent) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.handleResource(ctx, evt.Object, "generic event", queue)
}

// handleTypedResource handles resource events for the typed interface
func (h *enqueueRequestsForResourceEvent) handleResource(_ context.Context, obj interface{}, eventType string, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
var namespace, name string

// Extract namespace and name based on the object type
switch res := obj.(type) {
case *corev1.Service:
namespace = res.Namespace
name = res.Name
case *networking.Ingress:
namespace = res.Namespace
name = res.Name
case *gwv1.Gateway:
namespace = res.Namespace
name = res.Name
case *unstructured.Unstructured:
namespace = res.GetNamespace()
name = res.GetName()
default:
h.logger.Error(nil, "Unknown resource type", "type", h.resourceType)
return
}

resourceKey := aga.ResourceKey{
Type: h.resourceType,
Name: types.NamespacedName{
Namespace: namespace,
Name: name,
},
}

// If this resource is not referenced by any GA, no need to queue reconciles
if !h.referenceTracker.IsResourceReferenced(resourceKey) {
return
}

// Get all GAs that reference this resource
gaRefs := h.referenceTracker.GetGAsForResource(resourceKey)

// Queue reconcile for affected GAs
for _, gaRef := range gaRefs {
h.logger.V(1).Info("Enqueueing GA for reconcile due to resource event",
"resourceType", h.resourceType,
"resourceName", resourceKey.Name,
"eventType", eventType,
"ga", gaRef)

queue.Add(reconcile.Request{NamespacedName: gaRef})
}
}
151 changes: 146 additions & 5 deletions controllers/aga/globalaccelerator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
"sigs.k8s.io/aws-load-balancer-controller/controllers/aga/eventhandlers"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
"sigs.k8s.io/aws-load-balancer-controller/pkg/config"
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy"
Expand All @@ -50,6 +53,7 @@ import (
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
agastatus "sigs.k8s.io/aws-load-balancer-controller/pkg/status/aga"
gwclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
)

const (
Expand All @@ -64,6 +68,9 @@ const (
requeueMessage = "Monitoring provisioning state"
statusUpdateRequeueTime = 1 * time.Minute

// Status reason constants
EndpointLoadFailed = "EndpointLoadFailed"

// Metric stage constants
MetricStageFetchGlobalAccelerator = "fetch_globalAccelerator"
MetricStageAddFinalizers = "add_finalizers"
Expand Down Expand Up @@ -108,6 +115,18 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
// Create status updater
statusUpdater := agastatus.NewStatusUpdater(k8sClient, logger)

// Create reference tracker for endpoint tracking
referenceTracker := aga.NewReferenceTracker(logger.WithName("reference-tracker"))

// Create DNS resolver
dnsToLoadBalancerResolver, err := aga.NewDNSToLoadBalancerResolver(cloud.ELBV2())
if err != nil {
logger.Error(err, "Failed to create DNS resolver")
}

// Create unified endpoint loader
endpointLoader := aga.NewEndpointLoader(k8sClient, dnsToLoadBalancerResolver, logger.WithName("endpoint-loader"))

return &globalAcceleratorReconciler{
k8sClient: k8sClient,
eventRecorder: eventRecorder,
Expand All @@ -120,6 +139,13 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
metricsCollector: metricsCollector,
reconcileTracker: reconcileCounters.IncrementAGA,

// Components for endpoint reference tracking
referenceTracker: referenceTracker,
dnsToLoadBalancerResolver: dnsToLoadBalancerResolver,

// Unified endpoint loader
endpointLoader: endpointLoader,

maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
maxExponentialBackoffDelay: config.GlobalAcceleratorMaxExponentialBackoffDelay,
}
Expand All @@ -138,6 +164,21 @@ type globalAcceleratorReconciler struct {
metricsCollector lbcmetrics.MetricCollector
reconcileTracker func(namespaceName ktypes.NamespacedName)

// Components for endpoint reference tracking
referenceTracker *aga.ReferenceTracker
dnsToLoadBalancerResolver *aga.DNSToLoadBalancerResolver

// Unified endpoint loader
endpointLoader aga.EndpointLoader

// Resources manager for dedicated endpoint resource watchers
endpointResourcesManager aga.EndpointResourcesManager

// Event channels for dedicated watchers
serviceEventChan chan event.GenericEvent
ingressEventChan chan event.GenericEvent
gatewayEventChan chan event.GenericEvent

maxConcurrentReconciles int
maxExponentialBackoffDelay time.Duration
}
Expand Down Expand Up @@ -194,6 +235,13 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAccelerator(ctx context.Con

func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
if k8s.HasFinalizer(ga, shared_constants.GlobalAcceleratorFinalizer) {
// Clean up references in the reference tracker
gaKey := k8s.NamespacedName(ga)
r.referenceTracker.RemoveGA(gaKey)

// Clean up resource watches
r.endpointResourcesManager.RemoveGA(gaKey)

// TODO: Implement cleanup logic for AWS Global Accelerator resources (Only cleaning up accelerator for now)
if err := r.cleanupGlobalAcceleratorResources(ctx, ga); err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedCleanup, fmt.Sprintf("Failed cleanup due to %v", err))
Expand Down Expand Up @@ -224,6 +272,29 @@ func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi

func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
r.logger.Info("Reconciling GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))

// Get all endpoints from GA
endpoints := aga.GetAllEndpointsFromGA(ga)

// Track referenced endpoints
r.referenceTracker.UpdateReferencesForGA(ga, endpoints)

// Update resource watches with the endpointResourcesManager
r.endpointResourcesManager.MonitorEndpointResources(ga, endpoints)

// Validate and load endpoint status using the endpoint loader
_, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)
if len(fatalErrors) > 0 {
err := fmt.Errorf("failed to load endpoints: %v", fatalErrors[0])
r.logger.Error(err, "Fatal error loading endpoints")

// Handle other endpoint loading errors
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, EndpointLoadFailed, err.Error()); statusErr != nil {
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after endpoint load failure")
}
return err
}

var stack core.Stack
var accelerator *agamodel.Accelerator
var err error
Expand Down Expand Up @@ -335,21 +406,91 @@ func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr
return nil
}

if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
// Create event channels for dedicated watchers
r.serviceEventChan = make(chan event.GenericEvent)
r.ingressEventChan = make(chan event.GenericEvent)
r.gatewayEventChan = make(chan event.GenericEvent)

// Initialize Gateway API client using the same config
gwClient, err := gwclientset.NewForConfig(mgr.GetConfig())
if err != nil {
r.logger.Error(err, "Failed to create Gateway API client")
return err
}

// TODO: Add event handlers for Services, Ingresses, and Gateways
// that are referenced by GlobalAccelerator endpoints
// Initialize the endpoint resources manager with clients
r.endpointResourcesManager = aga.NewEndpointResourcesManager(
clientSet,
gwClient,
r.serviceEventChan,
r.ingressEventChan,
r.gatewayEventChan,
r.logger.WithName("endpoint-resources-manager"),
)

return ctrl.NewControllerManagedBy(mgr).
if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
return err
}

// Set up the controller builder
ctrl, err := ctrl.NewControllerManagedBy(mgr).
For(&agaapi.GlobalAccelerator{}).
Named(controllerName).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Second, r.maxExponentialBackoffDelay),
}).
Complete(r)
Build(r)

if err != nil {
return err
}

// Setup watches for resource events
if err := r.setupGlobalAcceleratorWatches(ctrl); err != nil {
return err
}

return nil
}

// setupGlobalAcceleratorWatches sets up watches for resources that can trigger reconciliation of GlobalAccelerator objects
func (r *globalAcceleratorReconciler) setupGlobalAcceleratorWatches(c controller.Controller) error {
loggerPrefix := r.logger.WithName("eventHandlers")

// Create handlers for our dedicated watchers
serviceHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
aga.ServiceResourceType,
r.referenceTracker,
loggerPrefix.WithName("service-handler"),
)

ingressHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
aga.IngressResourceType,
r.referenceTracker,
loggerPrefix.WithName("ingress-handler"),
)

gatewayHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
aga.GatewayResourceType,
r.referenceTracker,
loggerPrefix.WithName("gateway-handler"),
)

// Add watches using the channel sources with event handlers
if err := c.Watch(source.Channel(r.serviceEventChan, serviceHandler)); err != nil {
return err
}

if err := c.Watch(source.Channel(r.ingressEventChan, ingressHandler)); err != nil {
return err
}

if err := c.Watch(source.Channel(r.gatewayEventChan, gatewayHandler)); err != nil {
return err
}

return nil
}

func (r *globalAcceleratorReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ require (
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/huandu/xstrings v1.5.0 // indirect
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down Expand Up @@ -148,6 +149,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/spf13/cobra v1.9.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.34.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/arc/v2 v2.0.5 h1:l2zaLDubNhW4XO3LnliVj0GXO3+/CGNJAg1dcN2Fpfw=
github.com/hashicorp/golang-lru/arc/v2 v2.0.5/go.mod h1:ny6zBSQZi2JxIeYcv7kt2sH2PXJtirBN7RDhRpxPkxU=
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
Expand Down
Loading
Loading