Skip to content

Commit

Permalink
Amend apiexportendpointslice reconciliation to filter endpoints by pa…
Browse files Browse the repository at this point in the history
…rtition

Signed-off-by: Frederic Giloux <fgiloux@redhat.com>
  • Loading branch information
fgiloux committed Jan 11, 2023
1 parent 8d9f71c commit e341969
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 48 deletions.
2 changes: 1 addition & 1 deletion config/crds/apis.kcp.io_apiexportendpointslices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
scope: Cluster
versions:
- additionalPrinterColumns:
- jsonPath: .spec.apiexport.workspace.exportName
- jsonPath: .spec.export.name
name: Export
type: string
- jsonPath: .spec.partition
Expand Down
4 changes: 2 additions & 2 deletions hack/generate/crd-ref/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ source_repositories:
topics:
- workload
- synctargets
partitions.apis.kcp.io:
partitions.topology.kcp.io:
owner:
- https://github.com/kcp-dev/kcp
topics:
- topology
partitionsets.apis.kcp.io:
partitionsets.topology.kcp.io:
owner:
- https://github.com/kcp-dev/kcp
topics:
Expand Down
11 changes: 10 additions & 1 deletion pkg/apis/apis/v1alpha1/types_apiexportendpointslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:subresource:status
// +kubebuilder:resource:scope=Cluster,categories=kcp,path=apiexportendpointslices,singular=apiexportendpointslice
// +kubebuilder:printcolumn:name="Export",type="string",JSONPath=".spec.apiexport.workspace.exportName"
// +kubebuilder:printcolumn:name="Export",type="string",JSONPath=".spec.export.name"
// +kubebuilder:printcolumn:name="Partition",type="string",JSONPath=".spec.partition"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

Expand Down Expand Up @@ -103,7 +103,16 @@ func (in *APIExportEndpointSlice) SetConditions(conditions conditionsv1alpha1.Co
// These are valid conditions of APIExportEndpointSlice in addition to
// APIExportValid and related reasons defined with the APIBinding type.
const (
// PartitionValid is a condition for APIExportEndpointSlice that reflects the validity of the referenced Partition.
PartitionValid conditionsv1alpha1.ConditionType = "PartitionValid"

APIExportEndpointSliceURLsReady conditionsv1alpha1.ConditionType = "EndpointURLsReady"

// PartitionInvalidReferenceReason is a reason for the PartitionValid condition of APIExportEndpointSlice that the
// Partition reference is invalid.
PartitionInvalidReferenceReason = "PartitionInvalidReference"
// PartitionNotFoundReason is a reason for the PartitionValid condition that the referenced Partition is not found.
PartitionNotFoundReason = "PartitionNotFound"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ import (
apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/pkg/apis/core"
corev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/core/v1alpha1"
topologyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/topology/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
apisv1alpha1client "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/typed/apis/v1alpha1"
apisinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/apis/v1alpha1"
corev1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/core/v1alpha1"
topologyinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/topology/v1alpha1"
"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
Expand All @@ -55,8 +57,9 @@ const (
// Shards and APIExports are read from the cache server.
func NewController(
apiExportEndpointSliceClusterInformer apisinformers.APIExportEndpointSliceClusterInformer,
shardClusterInformer corev1alpha1informers.ShardClusterInformer,
apiExportClusterInformer apisinformers.APIExportClusterInformer,
globalShardClusterInformer corev1alpha1informers.ShardClusterInformer,
globalAPIExportClusterInformer apisinformers.APIExportClusterInformer,
partitionClusterInformer topologyinformers.PartitionClusterInformer,
kcpClusterClient kcpclientset.ClusterInterface,
) (*controller, error) {
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName)
Expand All @@ -66,20 +69,38 @@ func NewController(
listAPIExportEndpointSlices: func() ([]*apisv1alpha1.APIExportEndpointSlice, error) {
return apiExportEndpointSliceClusterInformer.Lister().List(labels.Everything())
},
listShards: func() ([]*corev1alpha1.Shard, error) {
return shardClusterInformer.Lister().List(labels.Everything())
listShards: func(selector labels.Selector) ([]*corev1alpha1.Shard, error) {
return globalShardClusterInformer.Lister().List(selector)
},
getAPIExportEndpointSlice: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error) {
return apiExportEndpointSliceClusterInformer.Lister().Cluster(clusterName).Get(name)
},
getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) {
return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), apiExportClusterInformer.Informer().GetIndexer(), path, name)
return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), globalAPIExportClusterInformer.Informer().GetIndexer(), path, name)
},
getPartition: func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.Partition, error) {
return partitionClusterInformer.Lister().Cluster(clusterName).Get(name)
},
getAPIExportEndpointSlicesByPartition: func(key string) ([]*apisv1alpha1.APIExportEndpointSlice, error) {
list, err := apiExportEndpointSliceClusterInformer.Informer().GetIndexer().ByIndex(indexAPIExportEndpointSlicesByPartition, key)
if err != nil {
return nil, err
}
var slices []*apisv1alpha1.APIExportEndpointSlice
for _, obj := range list {
slice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice)
if !ok {
return nil, fmt.Errorf("obj is supposed to be an APIExportEndpointSlice, but is %T", obj)
}
slices = append(slices, slice)
}
return slices, nil
},
apiExportEndpointSliceClusterInformer: apiExportEndpointSliceClusterInformer,
commit: committer.NewCommitter[*APIExportEndpointSlice, Patcher, *APIExportEndpointSliceSpec, *APIExportEndpointSliceStatus](kcpClusterClient.ApisV1alpha1().APIExportEndpointSlices()),
}

indexers.AddIfNotPresentOrDie(apiExportClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexers.AddIfNotPresentOrDie(globalAPIExportClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

Expand All @@ -99,7 +120,7 @@ func NewController(
},
})

apiExportClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
globalAPIExportClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueAPIExportEndpointSlicesForAPIExport(obj)
},
Expand All @@ -108,7 +129,7 @@ func NewController(
},
})

shardClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
globalShardClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueAllAPIExportEndpointSlices(obj)
},
Expand All @@ -123,6 +144,24 @@ func NewController(
},
)

partitionClusterInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueuePartition(obj)
},
UpdateFunc: func(_, newObj interface{}) {
c.enqueuePartition(newObj)
},
DeleteFunc: func(obj interface{}) {
c.enqueuePartition(obj)
},
},
)

indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexAPIExportEndpointSlicesByPartition: indexAPIExportEndpointSlicesByPartitionFunc,
})

return c, nil
}

Expand All @@ -138,10 +177,12 @@ type CommitFunc = func(context.Context, *Resource, *Resource) error
type controller struct {
queue workqueue.RateLimitingInterface

listShards func() ([]*corev1alpha1.Shard, error)
listAPIExportEndpointSlices func() ([]*apisv1alpha1.APIExportEndpointSlice, error)
getAPIExportEndpointSlice func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error)
getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error)
listShards func(selector labels.Selector) ([]*corev1alpha1.Shard, error)
listAPIExportEndpointSlices func() ([]*apisv1alpha1.APIExportEndpointSlice, error)
getAPIExportEndpointSlice func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error)
getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error)
getPartition func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.Partition, error)
getAPIExportEndpointSlicesByPartition func(key string) ([]*apisv1alpha1.APIExportEndpointSlice, error)

apiExportEndpointSliceClusterInformer apisinformers.APIExportEndpointSliceClusterInformer
commit CommitFunc
Expand Down Expand Up @@ -198,7 +239,7 @@ func (c *controller) enqueueAPIExportEndpointSlicesForAPIExport(obj interface{})
continue
}
logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*apisv1alpha1.APIExport))
logging.WithQueueKey(logger, key).V(2).Info("queuing APIExportEndpointSlices because of referenced APIExport")
logging.WithQueueKey(logger, key).V(4).Info("queuing APIExportEndpointSlices because of referenced APIExport")
c.enqueueAPIExportEndpointSlice(slice)
}
}
Expand All @@ -219,11 +260,32 @@ func (c *controller) enqueueAllAPIExportEndpointSlices(shard interface{}) {
continue
}

logging.WithQueueKey(logger, key).V(2).Info("queuing APIExportEndpointSlice because Shard changed")
logging.WithQueueKey(logger, key).V(4).Info("queuing APIExportEndpointSlice because Shard changed")
c.queue.Add(key)
}
}

// enqueuePartition maps a Partition to APIExportEndpointSlices for enqueuing.
func (c *controller) enqueuePartition(obj interface{}) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}

slices, err := c.getAPIExportEndpointSlicesByPartition(key)
if err != nil {
runtime.HandleError(err)
return
}

logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*topologyv1alpha1.Partition))
logging.WithQueueKey(logger, key).V(4).Info("queuing APIExportEndpointSlices because Partition changed")
for _, slice := range slices {
c.enqueueAPIExportEndpointSlice(slice)
}
}

// Start starts the controller, which stops when ctx.Done() is closed.
func (c *controller) Start(ctx context.Context, numThreads int) {
defer runtime.HandleCrash()
Expand Down
Loading

0 comments on commit e341969

Please sign in to comment.