Skip to content

Commit

Permalink
Merge pull request #2784 from qiujian16/fix-location-placement
Browse files Browse the repository at this point in the history
🐛 Fix location cannot be found by placement during scheduling
  • Loading branch information
openshift-merge-robot committed Feb 20, 2023
2 parents 3730234 + a9340d9 commit 109688a
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 14 deletions.
30 changes: 20 additions & 10 deletions pkg/reconciler/scheduling/placement/placement_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
kcpcorev1informers "github.com/kcp-dev/client-go/informers/core/v1"
corev1listers "github.com/kcp-dev/client-go/listers/core/v1"
"github.com/kcp-dev/logicalcluster/v3"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,6 +38,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/apis/core"
schedulingv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/scheduling/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
schedulingv1alpha1client "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/typed/scheduling/v1alpha1"
Expand Down Expand Up @@ -216,32 +218,40 @@ func (c *controller) enqueueNamespace(obj interface{}) {

func (c *controller) enqueueLocation(obj interface{}) {
logger := logging.WithReconciler(klog.Background(), ControllerName)
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
clusterName, _, _, err := kcpcache.SplitMetaClusterNamespaceKey(key)
if err != nil {
runtime.HandleError(err)

location, ok := obj.(*schedulingv1alpha1.Location)
if !ok {
runtime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return
}

placements, err := c.placementIndexer.ByIndex(byLocationWorkspace, clusterName.String())
// placements referencing by cluster name
placements, err := c.placementIndexer.ByIndex(byLocationWorkspace, logicalcluster.From(location).String())
if err != nil {
runtime.HandleError(err)
return
}
if path := location.Annotations[core.LogicalClusterPathAnnotationKey]; path != "" {
// placements referencing by path
placementsByPath, err := c.placementIndexer.ByIndex(byLocationWorkspace, path)
if err != nil {
runtime.HandleError(err)
return
}
placements = append(placements, placementsByPath...)
}

for _, obj := range placements {
placement := obj.(*schedulingv1alpha1.Placement)
locationKey := key
key, err := kcpcache.MetaClusterNamespaceKeyFunc(placement)
if err != nil {
runtime.HandleError(err)
continue
}
logging.WithQueueKey(logger, key).V(2).Info("queueing Placement because Location changed", "Location", locationKey)
logging.WithQueueKey(logger, key).V(2).Info("queueing Placement because Location changed")
c.queue.Add(key)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r *placementReconciler) reconcile(ctx context.Context, placement *scheduli
locationWorkspace = logicalcluster.From(placement).Path()
}

validLocationNames, err := r.validLocationNames(placement, locationWorkspace)
locationWorkspace, validLocationNames, err := r.validLocationNames(placement, locationWorkspace)
if err != nil {
conditions.MarkFalse(placement, schedulingv1alpha1.PlacementReady, schedulingv1alpha1.LocationNotFoundReason, conditionsv1alpha1.ConditionSeverityError, err.Error())
return reconcileStatusContinue, placement, err
Expand Down Expand Up @@ -108,18 +108,20 @@ func (r *placementReconciler) reconcile(ctx context.Context, placement *scheduli
return reconcileStatusContinue, placement, nil
}

func (r *placementReconciler) validLocationNames(placement *schedulingv1alpha1.Placement, locationWorkspace logicalcluster.Path) (sets.String, error) {
func (r *placementReconciler) validLocationNames(placement *schedulingv1alpha1.Placement, locationWorkspace logicalcluster.Path) (logicalcluster.Path, sets.String, error) {
var locationCluster logicalcluster.Path
selectedLocations := sets.NewString()

locations, err := r.listLocationsByPath(locationWorkspace)
if err != nil {
return selectedLocations, err
return logicalcluster.None, selectedLocations, err
}

for _, loc := range locations {
if loc.Spec.Resource != placement.Spec.LocationResource {
continue
}
locationCluster = logicalcluster.From(loc).Path()

for i := range placement.Spec.LocationSelectors {
s := placement.Spec.LocationSelectors[i]
Expand All @@ -135,7 +137,7 @@ func (r *placementReconciler) validLocationNames(placement *schedulingv1alpha1.P
}
}

return selectedLocations, nil
return locationCluster, selectedLocations, nil
}

func isValidLocationSelected(placement *schedulingv1alpha1.Placement, cluster logicalcluster.Path, validLocationNames sets.String) bool {
Expand Down
61 changes: 61 additions & 0 deletions test/e2e/reconciler/scheduling/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
schedulingv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/scheduling/v1alpha1"
"github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions"
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
"github.com/kcp-dev/kcp/test/e2e/framework"
Expand Down Expand Up @@ -257,3 +258,63 @@ func TestScheduling(t *testing.T) {
return found, fmt.Sprintf("no %s annotation:\n%s", schedulingv1alpha1.PlacementAnnotationKey, ns.Annotations)
}, wait.ForeverTestTimeout, time.Millisecond*100)
}

// TestSchedulingWhenLocationIsMissing create placement at first when location is missing and created later.
func TestSchedulingWhenLocationIsMissing(t *testing.T) {
t.Parallel()
framework.Suite(t, "transparent-multi-cluster")

ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

source := framework.SharedKcpServer(t)

orgPath, _ := framework.NewOrganizationFixture(t, source, framework.TODO_WithoutMultiShardSupport())
locationPath, _ := framework.NewWorkspaceFixture(t, source, orgPath, framework.TODO_WithoutMultiShardSupport())
userPath, userWorkspace := framework.NewWorkspaceFixture(t, source, orgPath, framework.TODO_WithoutMultiShardSupport())

kcpClusterClient, err := kcpclientset.NewForConfig(source.BaseConfig(t))
require.NoError(t, err)

// create a placement at first
newPlacement := &schedulingv1alpha1.Placement{
ObjectMeta: metav1.ObjectMeta{
Name: "new-placement",
},
Spec: schedulingv1alpha1.PlacementSpec{
LocationSelectors: []metav1.LabelSelector{{}},
NamespaceSelector: &metav1.LabelSelector{},
LocationResource: schedulingv1alpha1.GroupVersionResource{
Group: "workload.kcp.io",
Version: "v1alpha1",
Resource: "synctargets",
},
LocationWorkspace: locationPath.String(),
},
}
_, err = kcpClusterClient.Cluster(userPath).SchedulingV1alpha1().Placements().Create(ctx, newPlacement, metav1.CreateOptions{})
require.NoError(t, err)

t.Logf("Placement should turn to pending phase")
framework.Eventually(t, func() (bool, string) {
placement, err := kcpClusterClient.Cluster(userPath).SchedulingV1alpha1().Placements().Get(ctx, newPlacement.Name, metav1.GetOptions{})
if err != nil {
return false, fmt.Sprintf("Failed to get placement: %v", err)
}

return placement.Status.Phase == schedulingv1alpha1.PlacementPending, ""
}, wait.ForeverTestTimeout, time.Millisecond*100)

syncTargetName := "synctarget"
t.Logf("Creating a SyncTarget and syncer in %s", locationPath)
_ = framework.NewSyncerFixture(t, source, locationPath,
framework.WithExtraResources("services"),
framework.WithSyncTargetName(syncTargetName),
framework.WithSyncedUserWorkspaces(userWorkspace),
).CreateSyncTargetAndApplyToDownstream(t).StartAPIImporter(t).StartHeartBeat(t)

t.Logf("Wait for placement to be ready")
framework.EventuallyCondition(t, func() (conditions.Getter, error) {
return kcpClusterClient.Cluster(userPath).SchedulingV1alpha1().Placements().Get(ctx, newPlacement.Name, metav1.GetOptions{})
}, framework.Is(schedulingv1alpha1.PlacementReady))
}

0 comments on commit 109688a

Please sign in to comment.