Skip to content

Commit

Permalink
Merge pull request #2780 from ncdc/deflake-upsyncer-pv
Browse files Browse the repository at this point in the history
🌱 syncer vw: add API definition logging
  • Loading branch information
openshift-merge-robot committed Feb 13, 2023
2 parents 80a16e4 + 993f42b commit b505b8a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,15 @@ func (c *APIReconciler) enqueueAPIExport(obj interface{}, logger logr.Logger, lo
return
}

synctargets, err := c.syncTargetIndexer.ByIndex(IndexSyncTargetsByExport, key)
syncTargets, err := indexers.ByIndex[*workloadv1alpha1.SyncTarget](c.syncTargetIndexer, IndexSyncTargetsByExport, key)
if err != nil {
runtime.HandleError(err)
return
}

for _, obj := range synctargets {
logger := logging.WithObject(logger, obj.(*workloadv1alpha1.SyncTarget))
c.enqueueSyncTarget(obj, logger, " because of APIExport")
for _, syncTarget := range syncTargets {
logger := logging.WithObject(logger, syncTarget)
c.enqueueSyncTarget(syncTarget, logger, " because of APIExport")
}
}

Expand All @@ -185,15 +185,15 @@ func (c *APIReconciler) enqueueAPIResourceSchema(obj interface{}, logger logr.Lo
return
}

apiExports, err := c.apiExportIndexer.ByIndex(IndexAPIExportsByAPIResourceSchema, key)
apiExports, err := indexers.ByIndex[*apisv1alpha1.APIExport](c.apiExportIndexer, IndexAPIExportsByAPIResourceSchema, key)
if err != nil {
runtime.HandleError(err)
return
}

for _, obj := range apiExports {
logger := logging.WithObject(logger, obj.(*apisv1alpha1.APIExport))
c.enqueueAPIExport(obj, logger, " because of APIResourceSchema")
for _, apiExport := range apiExports {
logger := logging.WithObject(logger, apiExport)
c.enqueueAPIExport(apiExport, logger, " because of APIResourceSchema")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package apireconciler

import (
"fmt"

"github.com/kcp-dev/logicalcluster/v3"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
Expand All @@ -29,10 +27,7 @@ import (

// IndexAPIExportsByAPIResourceSchemas is an index function that maps an APIExport to its spec.latestResourceSchemas.
func IndexAPIExportsByAPIResourceSchemas(obj interface{}) ([]string, error) {
apiExport, ok := obj.(*apisv1alpha1.APIExport)
if !ok {
return []string{}, fmt.Errorf("obj is supposed to be an APIExport, but is %T", obj)
}
apiExport := obj.(*apisv1alpha1.APIExport)

ret := make([]string, len(apiExport.Spec.LatestResourceSchemas))
for i := range apiExport.Spec.LatestResourceSchemas {
Expand All @@ -43,23 +38,20 @@ func IndexAPIExportsByAPIResourceSchemas(obj interface{}) ([]string, error) {
}

func IndexSyncTargetsByExports(obj interface{}) ([]string, error) {
synctarget, ok := obj.(*workloadv1alpha1.SyncTarget)
if !ok {
return []string{}, fmt.Errorf("obj is supposed to be a SyncTarget, but is %T", obj)
}
syncTarget := obj.(*workloadv1alpha1.SyncTarget)

clusterName := logicalcluster.From(synctarget)
if len(synctarget.Spec.SupportedAPIExports) == 0 {
clusterName := logicalcluster.From(syncTarget)
if len(syncTarget.Spec.SupportedAPIExports) == 0 {
return []string{client.ToClusterAwareKey(clusterName.Path(), reconcilerapiexport.TemporaryComputeServiceExportName)}, nil
}

keys := make([]string, 0, len(synctarget.Spec.SupportedAPIExports))
for _, export := range synctarget.Spec.SupportedAPIExports {
if len(export.Export) == 0 {
keys = append(keys, client.ToClusterAwareKey(clusterName.Path(), export.Export))
continue
keys := make([]string, 0, len(syncTarget.Spec.SupportedAPIExports))
for _, export := range syncTarget.Spec.SupportedAPIExports {
path := export.Path
if path == "" {
path = clusterName.String()
}
keys = append(keys, client.ToClusterAwareKey(logicalcluster.Name(export.Path).Path(), export.Export))
keys = append(keys, client.ToClusterAwareKey(logicalcluster.NewPath(path), export.Export))
}

return keys, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *APIReconciler) reconcile(ctx context.Context, apiDomainKey dynamicconte
logger := klog.FromContext(ctx)

// collect APIResourceSchemas by syncTarget.
apiResourceSchemas, schemaIdentites, err := c.getAllAcceptedResourceSchemas(syncTarget)
apiResourceSchemas, schemaIdentites, err := c.getAllAcceptedResourceSchemas(ctx, syncTarget)
if err != nil {
return err
}
Expand Down Expand Up @@ -150,40 +150,63 @@ func gvrString(gvr schema.GroupVersionResource) string {

// getAllAcceptedResourceSchemas return all resourceSchemas from APIExports defined in this syncTarget filtered by the status.syncedResource
// of syncTarget such that only resources with accepted state is returned, together with their identityHash.
func (c *APIReconciler) getAllAcceptedResourceSchemas(syncTarget *workloadv1alpha1.SyncTarget) (map[schema.GroupResource]*apisv1alpha1.APIResourceSchema, map[schema.GroupResource]string, error) {
func (c *APIReconciler) getAllAcceptedResourceSchemas(ctx context.Context, syncTarget *workloadv1alpha1.SyncTarget) (map[schema.GroupResource]*apisv1alpha1.APIResourceSchema, map[schema.GroupResource]string, error) {
apiResourceSchemas := map[schema.GroupResource]*apisv1alpha1.APIResourceSchema{}

identityHashByGroupResource := map[schema.GroupResource]string{}

logger := klog.FromContext(ctx)
logger.V(4).Info("getting identity hashes for compatible APIs", "count", len(syncTarget.Status.SyncedResources))

// get all identityHash for compatible APIs
for _, syncedResource := range syncTarget.Status.SyncedResources {
logger := logger.WithValues(
"group", syncedResource.Group,
"resource", syncedResource.Resource,
"identity", syncedResource.IdentityHash,
)
if syncedResource.State == workloadv1alpha1.ResourceSchemaAcceptedState {
logger.V(4).Info("including synced resource because it is accepted")
identityHashByGroupResource[schema.GroupResource{
Group: syncedResource.Group,
Resource: syncedResource.Resource,
}] = syncedResource.IdentityHash
} else {
logger.V(4).Info("excluding synced resource because it is unaccepted")
}
}

logger.V(4).Info("processing supported APIExports", "count", len(syncTarget.Spec.SupportedAPIExports))
var errs []error
for _, exportRef := range syncTarget.Spec.SupportedAPIExports {
logger.V(4).Info("looking at export", "path", exportRef.Path, "name", exportRef.Export)

path := logicalcluster.NewPath(exportRef.Path)
if path.Empty() {
logger.V(4).Info("falling back to sync target's logical cluster for path")
path = logicalcluster.From(syncTarget).Path()
}

logger := logger.WithValues("path", path, "name", exportRef.Export)
logger.V(4).Info("getting APIExport")
apiExport, err := indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), c.apiExportIndexer, path, exportRef.Export)
if err != nil {
logger.V(4).Error(err, "error getting APIExport")
errs = append(errs, err)
continue
}

logger.V(4).Info("checking APIExport's schemas", "count", len(apiExport.Spec.LatestResourceSchemas))
for _, schemaName := range apiExport.Spec.LatestResourceSchemas {
logger := logger.WithValues("schema", schemaName)
logger.V(4).Info("getting APIResourceSchema")
apiResourceSchema, err := c.apiResourceSchemaLister.Cluster(logicalcluster.From(apiExport)).Get(schemaName)
if apierrors.IsNotFound(err) {
logger.V(4).Info("APIResourceSchema not found")
continue
}
if err != nil {
logger.V(4).Error(err, "error getting APIResourceSchema")
errs = append(errs, err)
continue
}
Expand All @@ -193,9 +216,14 @@ func (c *APIReconciler) getAllAcceptedResourceSchemas(syncTarget *workloadv1alph
Resource: apiResourceSchema.Spec.Names.Plural,
}

logger = logger.WithValues("group", gr.Group, "resource", gr.Resource)

// if identityHash does not exist, it is not a compatible API.
if _, ok := identityHashByGroupResource[gr]; ok {
logger.V(4).Info("identity found, including resource")
apiResourceSchemas[gr] = apiResourceSchema
} else {
logger.V(4).Info("identity not found, excluding resource")
}
}
}
Expand Down

0 comments on commit b505b8a

Please sign in to comment.