From d0d8cf38fa0c045b596a8ee71cf3b0bab200e27c Mon Sep 17 00:00:00 2001 From: Mara Sophie Grosch Date: Wed, 7 Dec 2022 13:31:53 +0100 Subject: [PATCH] Anexia Provider: Utilize `Creating` state instead of blocking `Create` call (#1483) (#1509) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mario Schäfer Signed-off-by: Mario Schäfer Signed-off-by: Mario Schäfer Co-authored-by: Mario Reggiori --- .../provider/anexia/helper_test.go | 23 ------ pkg/cloudprovider/provider/anexia/instance.go | 5 ++ pkg/cloudprovider/provider/anexia/provider.go | 77 +++++++------------ .../provider/anexia/provider_test.go | 32 -------- 4 files changed, 33 insertions(+), 104 deletions(-) diff --git a/pkg/cloudprovider/provider/anexia/helper_test.go b/pkg/cloudprovider/provider/anexia/helper_test.go index 0bcea21f8..38c3a37ef 100644 --- a/pkg/cloudprovider/provider/anexia/helper_test.go +++ b/pkg/cloudprovider/provider/anexia/helper_test.go @@ -18,11 +18,9 @@ package anexia import ( "encoding/json" - "net/http" "testing" "github.com/gophercloud/gophercloud/testhelper" - "go.anx.io/go-anxcloud/pkg/vsphere/search" "github.com/kubermatic/machine-controller/pkg/apis/cluster/v1alpha1" anxtypes "github.com/kubermatic/machine-controller/pkg/cloudprovider/provider/anexia/types" @@ -64,27 +62,6 @@ func getSpecsForValidationTest(t *testing.T, configCases []ConfigTestCase) []Val return testCases } -func createSearchHandler(t *testing.T, iterations int) http.HandlerFunc { - counter := 0 - return func(writer http.ResponseWriter, request *http.Request) { - test := request.URL.Query().Get("name") - testhelper.AssertEquals(t, "%-TestMachine", test) - testhelper.TestMethod(t, request, http.MethodGet) - if iterations == counter { - encoder := json.NewEncoder(writer) - testhelper.AssertNoErr(t, encoder.Encode(map[string]interface{}{ - "data": []search.VM{ - { - Name: "543053-TestMachine", - Identifier: TestIdentifier, - }, - }, - })) - } - counter++ - } -} - func newConfigVarString(str string) types.ConfigVarString { return types.ConfigVarString{ Value: str, diff --git a/pkg/cloudprovider/provider/anexia/instance.go b/pkg/cloudprovider/provider/anexia/instance.go index fa53467a1..cd67d80c5 100644 --- a/pkg/cloudprovider/provider/anexia/instance.go +++ b/pkg/cloudprovider/provider/anexia/instance.go @@ -28,6 +28,7 @@ import ( ) type anexiaInstance struct { + isCreating bool info *info.Info reservedAddresses []string } @@ -85,6 +86,10 @@ func (ai *anexiaInstance) Addresses() map[string]v1.NodeAddressType { } func (ai *anexiaInstance) Status() instance.Status { + if ai.isCreating { + return instance.StatusCreating + } + if ai.info != nil { if ai.info.Status == anxtypes.MachinePoweredOn { return instance.StatusRunning diff --git a/pkg/cloudprovider/provider/anexia/provider.go b/pkg/cloudprovider/provider/anexia/provider.go index 841723724..b89763308 100644 --- a/pkg/cloudprovider/provider/anexia/provider.go +++ b/pkg/cloudprovider/provider/anexia/provider.go @@ -23,6 +23,8 @@ import ( "errors" "fmt" "net/http" + "strings" + "sync" "time" anxclient "go.anx.io/go-anxcloud/pkg/client" @@ -45,7 +47,6 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" k8stypes "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" ) @@ -115,16 +116,6 @@ func (p *provider) Create(ctx context.Context, machine *clusterv1alpha1.Machine, retErr = anxtypes.NewMultiError(retErr, updateMachineStatus(machine, status, data.Update)) }() - // check whether machine is already provisioning - if isAlreadyProvisioning(ctx) && status.ProvisioningID == "" { - klog.Info("ongoing provisioning detected") - err := waitForVM(ctx, client) - if err != nil { - return nil, err - } - return p.Get(ctx, machine, data) - } - // provision machine err = provisionVM(ctx, client) if err != nil { @@ -133,33 +124,6 @@ func (p *provider) Create(ctx context.Context, machine *clusterv1alpha1.Machine, return p.Get(ctx, machine, data) } -func waitForVM(ctx context.Context, client anxclient.Client) error { - reconcileContext := getReconcileContext(ctx) - api := vsphere.NewAPI(client) - var identifier string - err := wait.PollImmediate(5*time.Second, 1*time.Minute, func() (bool, error) { - klog.V(2).Info("checking for VM with name ", reconcileContext.Machine.Name) - vms, err := api.Search().ByName(ctx, fmt.Sprintf("%%-%s", reconcileContext.Machine.Name)) - if err != nil { - return false, nil - } - if len(vms) < 1 { - return false, nil - } - if len(vms) > 1 { - return false, errors.New("too many VMs returned by search") - } - identifier = vms[0].Identifier - return true, nil - }) - if err != nil { - return err - } - - reconcileContext.Status.InstanceID = identifier - return updateMachineStatus(reconcileContext.Machine, *reconcileContext.Status, reconcileContext.ProviderData.Update) -} - func provisionVM(ctx context.Context, client anxclient.Client) error { reconcileContext := getReconcileContext(ctx) vmAPI := vsphere.NewAPI(client) @@ -229,15 +193,6 @@ func provisionVM(ctx context.Context, client anxclient.Client) error { klog.V(2).Info(fmt.Sprintf("Using provisionID from machine '%s' to await completion", reconcileContext.Machine.Name)) - instanceID, err := vmAPI.Provisioning().Progress().AwaitCompletion(ctx, status.ProvisioningID) - if err != nil { - klog.Errorf("failed to await machine completion '%s'", reconcileContext.Machine.Name) - // something went wrong remove provisioning ID, so we can start from scratch - status.ProvisioningID = "" - return newError(common.CreateMachineError, "instance provisioning failed: %v", err) - } - - status.InstanceID = instanceID meta.SetStatusCondition(&status.Conditions, v1.Condition{ Type: ProvisionedType, Status: v1.ConditionTrue, @@ -248,6 +203,8 @@ func provisionVM(ctx context.Context, client anxclient.Client) error { return updateMachineStatus(reconcileContext.Machine, *status, reconcileContext.ProviderData.Update) } +var _engsup3404mutex sync.Mutex + func getIPAddress(ctx context.Context, client anxclient.Client) (string, error) { reconcileContext := getReconcileContext(ctx) status := reconcileContext.Status @@ -258,6 +215,9 @@ func getIPAddress(ctx context.Context, client anxclient.Client) (string, error) return status.ReservedIP, nil } + _engsup3404mutex.Lock() + defer _engsup3404mutex.Unlock() + klog.Info(fmt.Sprintf("Creating a new IP for machine %q", reconcileContext.Machine.Name)) addrAPI := anxaddr.NewAPI(client) config := reconcileContext.Config @@ -447,7 +407,7 @@ func (p *provider) Validate(_ context.Context, machinespec clusterv1alpha1.Machi return nil } -func (p *provider) Get(ctx context.Context, machine *clusterv1alpha1.Machine, _ *cloudprovidertypes.ProviderData) (instance.Instance, error) { +func (p *provider) Get(ctx context.Context, machine *clusterv1alpha1.Machine, pd *cloudprovidertypes.ProviderData) (instance.Instance, error) { config, _, err := p.getConfig(machine.Spec.ProviderSpec) if err != nil { return nil, newError(common.InvalidConfigurationMachineError, "failed to retrieve config: %v", err) @@ -464,10 +424,29 @@ func (p *provider) Get(ctx context.Context, machine *clusterv1alpha1.Machine, _ return nil, newError(common.InvalidConfigurationMachineError, "failed to get machine status: %v", err) } - if status.InstanceID == "" { + if status.InstanceID == "" && status.ProvisioningID == "" { return nil, cloudprovidererrors.ErrInstanceNotFound } + if status.InstanceID == "" { + progress, err := vsphereAPI.Provisioning().Progress().Get(ctx, status.ProvisioningID) + if err != nil { + return nil, fmt.Errorf("failed to get provisioning progress: %w", err) + } + if len(progress.Errors) > 0 { + return nil, fmt.Errorf("vm provisioning had errors: %s", strings.Join(progress.Errors, ",")) + } + if progress.Progress < 100 || progress.VMIdentifier == "" { + return &anexiaInstance{isCreating: true}, nil + } + + status.InstanceID = progress.VMIdentifier + + if err := updateMachineStatus(machine, status, pd.Update); err != nil { + return nil, fmt.Errorf("failed updating machine status: %w", err) + } + } + instance := anexiaInstance{} if status.IPState == anxtypes.IPStateBound && status.ReservedIP != "" { diff --git a/pkg/cloudprovider/provider/anexia/provider_test.go b/pkg/cloudprovider/provider/anexia/provider_test.go index b26d29f30..08cec3257 100644 --- a/pkg/cloudprovider/provider/anexia/provider_test.go +++ b/pkg/cloudprovider/provider/anexia/provider_test.go @@ -50,38 +50,6 @@ func TestAnexiaProvider(t *testing.T) { server.Close() }) - t.Run("Test waiting for VM", func(t *testing.T) { - t.Parallel() - - waitUntilVMIsFound := 2 - testhelper.Mux.HandleFunc("/api/vsphere/v1/search/by_name.json", createSearchHandler(t, waitUntilVMIsFound)) - - providerStatus := anxtypes.ProviderStatus{} - ctx := createReconcileContext(context.Background(), reconcileContext{ - Machine: &v1alpha1.Machine{ - ObjectMeta: metav1.ObjectMeta{Name: "TestMachine"}, - }, - Status: &providerStatus, - UserData: "", - Config: resolvedConfig{}, - - ProviderData: &cloudprovidertypes.ProviderData{ - Update: func(m *clusterv1alpha1.Machine, mod ...cloudprovidertypes.MachineModifier) error { - return nil - }, - }, - }) - - err := waitForVM(ctx, client) - if err != nil { - t.Fatal("No error was expected", err) - } - - if providerStatus.InstanceID != TestIdentifier { - t.Error("Expected InstanceID to be set") - } - }) - t.Run("Test provision VM", func(t *testing.T) { t.Parallel() testhelper.Mux.HandleFunc("/api/ipam/v1/address/reserve/ip/count.json", func(writer http.ResponseWriter, request *http.Request) {