diff --git a/pkg/model/domodel/BUILD.bazel b/pkg/model/domodel/BUILD.bazel index 4fcfa577fcfc4..7bbd474e784a6 100644 --- a/pkg/model/domodel/BUILD.bazel +++ b/pkg/model/domodel/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "//pkg/apis/kops:go_default_library", "//pkg/dns:go_default_library", "//pkg/model:go_default_library", - "//pkg/resources/digitalocean:go_default_library", "//upup/pkg/fi:go_default_library", "//upup/pkg/fi/cloudup/do:go_default_library", "//upup/pkg/fi/cloudup/dotasks:go_default_library", diff --git a/pkg/model/domodel/droplets.go b/pkg/model/domodel/droplets.go index 30fb3c9cf99c8..76812ba983e0e 100644 --- a/pkg/model/domodel/droplets.go +++ b/pkg/model/domodel/droplets.go @@ -21,7 +21,6 @@ import ( "strings" "k8s.io/kops/pkg/model" - "k8s.io/kops/pkg/resources/digitalocean" "k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi/cloudup/do" "k8s.io/kops/upup/pkg/fi/cloudup/dotasks" @@ -76,9 +75,9 @@ func (d *DropletBuilder) Build(c *fi.ModelBuilderContext) error { clusterTagIndex := do.TagKubernetesClusterIndex + ":" + "etcd-" + strconv.Itoa(masterIndexCount) droplet.Tags = append(droplet.Tags, clusterTagIndex) droplet.Tags = append(droplet.Tags, clusterMasterTag) - droplet.Tags = append(droplet.Tags, digitalocean.TagKubernetesInstanceGroup+":"+ig.Name) + droplet.Tags = append(droplet.Tags, do.TagKubernetesInstanceGroup+":"+ig.Name) } else { - droplet.Tags = append(droplet.Tags, digitalocean.TagKubernetesInstanceGroup+":"+ig.Name) + droplet.Tags = append(droplet.Tags, do.TagKubernetesInstanceGroup+":"+ig.Name) } userData, err := d.BootstrapScriptBuilder.ResourceNodeUp(c, ig) diff --git a/pkg/resources/digitalocean/BUILD.bazel b/pkg/resources/digitalocean/BUILD.bazel index 0683ec059f1e4..b50e7fa13b9b5 100644 --- a/pkg/resources/digitalocean/BUILD.bazel +++ b/pkg/resources/digitalocean/BUILD.bazel @@ -2,24 +2,16 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = [ - "cloud.go", - "resources.go", - ], + srcs = ["resources.go"], importpath = "k8s.io/kops/pkg/resources/digitalocean", visibility = ["//visibility:public"], deps = [ "//dns-controller/pkg/dns:go_default_library", - "//dnsprovider/pkg/dnsprovider:go_default_library", "//pkg/apis/kops:go_default_library", - "//pkg/cloudinstances:go_default_library", "//pkg/resources:go_default_library", - "//pkg/resources/digitalocean/dns:go_default_library", - "//protokube/pkg/etcd:go_default_library", "//upup/pkg/fi:go_default_library", + "//upup/pkg/fi/cloudup/do:go_default_library", "//vendor/github.com/digitalocean/godo:go_default_library", - "//vendor/golang.org/x/oauth2:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], ) diff --git a/pkg/resources/digitalocean/cloud.go b/pkg/resources/digitalocean/cloud.go deleted file mode 100644 index ef9e0afd2adb0..0000000000000 --- a/pkg/resources/digitalocean/cloud.go +++ /dev/null @@ -1,457 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package digitalocean - -import ( - "context" - "errors" - "fmt" - "os" - "strconv" - "strings" - "time" - - "github.com/digitalocean/godo" - "golang.org/x/oauth2" - "k8s.io/klog/v2" - - v1 "k8s.io/api/core/v1" - "k8s.io/kops/dnsprovider/pkg/dnsprovider" - "k8s.io/kops/pkg/apis/kops" - "k8s.io/kops/pkg/cloudinstances" - "k8s.io/kops/pkg/resources/digitalocean/dns" - "k8s.io/kops/protokube/pkg/etcd" - "k8s.io/kops/upup/pkg/fi" -) - -const TagKubernetesClusterIndex = "k8s-index" -const TagKubernetesClusterNamePrefix = "KubernetesCluster" -const TagKubernetesInstanceGroup = "kops-instancegroup" - -// TokenSource implements oauth2.TokenSource -type TokenSource struct { - AccessToken string -} - -// Token() returns oauth2.Token -func (t *TokenSource) Token() (*oauth2.Token, error) { - token := &oauth2.Token{ - AccessToken: t.AccessToken, - } - return token, nil -} - -// Cloud exposes all the interfaces required to operate on DigitalOcean resources -type Cloud struct { - Client *godo.Client - - dns dnsprovider.Interface - - // RegionName holds the region, renamed to avoid conflict with Region() - RegionName string -} - -type DOInstanceGroup struct { - ClusterName string - InstanceGroupName string - GroupType string // will be either "master" or "worker" - Members []string // will store the droplet names that matches. -} - -var _ fi.Cloud = &Cloud{} - -// NewCloud returns a Cloud, expecting the env var DIGITALOCEAN_ACCESS_TOKEN -// NewCloud will return an err if DIGITALOCEAN_ACCESS_TOKEN is not defined -func NewCloud(region string) (*Cloud, error) { - accessToken := os.Getenv("DIGITALOCEAN_ACCESS_TOKEN") - if accessToken == "" { - return nil, errors.New("DIGITALOCEAN_ACCESS_TOKEN is required") - } - - tokenSource := &TokenSource{ - AccessToken: accessToken, - } - - oauthClient := oauth2.NewClient(context.TODO(), tokenSource) - client := godo.NewClient(oauthClient) - - return &Cloud{ - Client: client, - dns: dns.NewProvider(client), - RegionName: region, - }, nil -} - -func (c *Cloud) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) { - return getCloudGroups(c, cluster, instancegroups, warnUnmatched, nodes) -} - -// DeleteGroup is not implemented yet, is a func that needs to delete a DO instance group. -func (c *Cloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error { - klog.V(8).Info("digitalocean cloud provider DeleteGroup not implemented yet") - return fmt.Errorf("digital ocean cloud provider does not support deleting cloud groups at this time") -} - -func (c *Cloud) DeleteInstance(i *cloudinstances.CloudInstance) error { - dropletID, err := strconv.Atoi(i.ID) - if err != nil { - return fmt.Errorf("failed to convert droplet ID to int: %s", err) - } - - _, _, err = c.Client.DropletActions.Shutdown(context.TODO(), dropletID) - if err != nil { - return fmt.Errorf("error stopping instance %q: %v", dropletID, err) - } - - // Wait for 5 min to stop the instance - for i := 0; i < 5; i++ { - droplet, _, err := c.Client.Droplets.Get(context.TODO(), dropletID) - if err != nil { - return fmt.Errorf("error describing instance %q: %v", dropletID, err) - } - - klog.V(8).Infof("stopping DO instance %q, current Status: %q", droplet, droplet.Status) - - if droplet.Status == "off" { - break - } - - if i == 5 { - return fmt.Errorf("fail to stop DO instance %v in 5 mins", dropletID) - } - - time.Sleep(time.Minute * 1) - } - - _, err = c.Client.Droplets.Delete(context.TODO(), dropletID) - if err != nil { - return fmt.Errorf("error stopping instance %q: %v", dropletID, err) - } - - klog.V(8).Infof("deleted droplet instance %q", dropletID) - - return nil -} - -// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits. -func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstance) error { - klog.V(8).Info("digitalocean cloud provider DetachInstance not implemented yet") - return fmt.Errorf("digital ocean cloud provider does not support surging") -} - -// ProviderID returns the kops api identifier for DigitalOcean cloud provider -func (c *Cloud) ProviderID() kops.CloudProviderID { - return kops.CloudProviderDO -} - -// Region returns the DO region we will target -func (c *Cloud) Region() string { - return c.RegionName -} - -// DNS returns a DO implementation for dnsprovider.Interface -func (c *Cloud) DNS() (dnsprovider.Interface, error) { - return c.dns, nil -} - -// Volumes returns an implementation of godo.StorageService -func (c *Cloud) Volumes() godo.StorageService { - return c.Client.Storage -} - -// VolumeActions returns an implementation of godo.StorageActionsService -func (c *Cloud) VolumeActions() godo.StorageActionsService { - return c.Client.StorageActions -} - -func (c *Cloud) Droplets() godo.DropletsService { - return c.Client.Droplets -} - -func (c *Cloud) DropletActions() godo.DropletActionsService { - return c.Client.DropletActions -} - -func (c *Cloud) LoadBalancers() godo.LoadBalancersService { - return c.Client.LoadBalancers -} - -func (c *Cloud) GetAllLoadBalancers() ([]godo.LoadBalancer, error) { - return getAllLoadBalancers(c) -} - -// FindVPCInfo is not implemented, it's only here to satisfy the fi.Cloud interface -func (c *Cloud) FindVPCInfo(id string) (*fi.VPCInfo, error) { - return nil, errors.New("not implemented") -} - -func (c *Cloud) GetApiIngressStatus(cluster *kops.Cluster) ([]fi.ApiIngressStatus, error) { - var ingresses []fi.ApiIngressStatus - if cluster.Spec.MasterPublicName != "" { - // Note that this must match Digital Ocean's lb name - klog.V(2).Infof("Querying DO to find Loadbalancers for API (%q)", cluster.Name) - - loadBalancers, err := getAllLoadBalancers(c) - if err != nil { - return nil, fmt.Errorf("LoadBalancers.List returned error: %v", err) - } - - lbName := "api-" + strings.Replace(cluster.Name, ".", "-", -1) - - for _, lb := range loadBalancers { - if lb.Name == lbName { - klog.V(10).Infof("Matching LB name found for API (%q)", cluster.Name) - - if lb.Status != "active" { - return nil, fmt.Errorf("load-balancer is not yet active (current status: %s)", lb.Status) - } - - address := lb.IP - ingresses = append(ingresses, fi.ApiIngressStatus{IP: address}) - - return ingresses, nil - } - } - } - - return nil, nil -} - -// FindClusterStatus discovers the status of the cluster, by looking for the tagged etcd volumes -func (c *Cloud) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) { - etcdStatus, err := findEtcdStatus(c, cluster) - if err != nil { - return nil, err - } - status := &kops.ClusterStatus{ - EtcdClusters: etcdStatus, - } - klog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status)) - return status, nil -} - -// findEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes -func findEtcdStatus(c *Cloud, cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) { - statusMap := make(map[string]*kops.EtcdClusterStatus) - volumes, err := getAllVolumesByRegion(c, c.RegionName) - - if err != nil { - return nil, fmt.Errorf("failed to get all volumes by region from %s: %v", c.RegionName, err) - } - - for _, volume := range volumes { - volumeID := volume.ID - - etcdClusterName := "" - var etcdClusterSpec *etcd.EtcdClusterSpec - - for _, myTag := range volume.Tags { - klog.V(8).Infof("findEtcdStatus status (from cloud): checking if volume with tag %q belongs to cluster", myTag) - // check if volume belongs to this cluster. - // tag will be in the format "KubernetesCluster:dev5-k8s-local" (where clusterName is dev5.k8s.local) - clusterName := strings.Replace(cluster.Name, ".", "-", -1) - if strings.Contains(myTag, fmt.Sprintf("%s:%s", TagKubernetesClusterNamePrefix, clusterName)) { - klog.V(10).Infof("findEtcdStatus cluster comparison matched for tag: %v", myTag) - // this volume belongs to our cluster, add this to our etcdClusterSpec. - // loop through the tags again and - for _, volumeTag := range volume.Tags { - if strings.Contains(volumeTag, TagKubernetesClusterIndex) { - volumeTagParts := strings.Split(volumeTag, ":") - if len(volumeTagParts) < 2 { - return nil, fmt.Errorf("volume tag split failed, too few components for tag %q on volume %q", volumeTag, volume) - } - dropletIndex := volumeTagParts[1] - etcdClusterSpec, err = c.getEtcdClusterSpec(volume.Name, dropletIndex) - if err != nil { - return nil, fmt.Errorf("error parsing etcd cluster tag %q on volume %q: %v", volumeTag, volumeID, err) - } - - klog.V(10).Infof("findEtcdStatus etcdClusterSpec: %v", fi.DebugAsJsonString(etcdClusterSpec)) - etcdClusterName = etcdClusterSpec.ClusterKey - status := statusMap[etcdClusterName] - if status == nil { - status = &kops.EtcdClusterStatus{ - Name: etcdClusterName, - } - statusMap[etcdClusterName] = status - } - - memberName := etcdClusterSpec.NodeName - status.Members = append(status.Members, &kops.EtcdMemberStatus{ - Name: memberName, - VolumeId: volume.ID, - }) - } - } - } - } - } - - status := make([]kops.EtcdClusterStatus, 0, len(statusMap)) - for _, v := range statusMap { - status = append(status, *v) - } - - return status, nil -} - -func (c *Cloud) getEtcdClusterSpec(volumeName string, dropletName string) (*etcd.EtcdClusterSpec, error) { - var clusterKey string - if strings.Contains(volumeName, "etcd-main") { - clusterKey = "main" - } else if strings.Contains(volumeName, "etcd-events") { - clusterKey = "events" - } else { - return nil, fmt.Errorf("could not determine etcd cluster type for volume: %s", volumeName) - } - - return &etcd.EtcdClusterSpec{ - ClusterKey: clusterKey, - NodeName: dropletName, - NodeNames: []string{dropletName}, - }, nil -} - -func getCloudGroups(c *Cloud, cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) { - nodeMap := cloudinstances.GetNodeMap(nodes, cluster) - - groups := make(map[string]*cloudinstances.CloudInstanceGroup) - instanceGroups, err := FindInstanceGroups(c, cluster.ObjectMeta.Name) - if err != nil { - return nil, fmt.Errorf("unable to find autoscale groups: %v", err) - } - - for _, doGroup := range instanceGroups { - name := doGroup.InstanceGroupName - - instancegroup, err := matchInstanceGroup(name, cluster.ObjectMeta.Name, instancegroups) - if err != nil { - return nil, fmt.Errorf("error getting instance group for doGroup %q", name) - } - if instancegroup == nil { - if warnUnmatched { - klog.Warningf("Found doGroup with no corresponding instance group %q", name) - } - continue - } - - groups[instancegroup.ObjectMeta.Name], err = buildCloudInstanceGroup(c, instancegroup, doGroup, nodeMap) - if err != nil { - return nil, fmt.Errorf("error getting cloud instance group %q: %v", instancegroup.ObjectMeta.Name, err) - } - } - - klog.V(8).Infof("Cloud Instance Group Info = %v", groups) - return groups, nil - -} - -// FindInstanceGroups finds instance groups matching the specified tags -func FindInstanceGroups(c *Cloud, clusterName string) ([]DOInstanceGroup, error) { - var result []DOInstanceGroup - instanceGroupMap := make(map[string][]string) // map of instance group name with droplet ids - - clusterTag := "KubernetesCluster:" + strings.Replace(clusterName, ".", "-", -1) - droplets, err := getAllDropletsByTag(c, clusterTag) - if err != nil { - return nil, fmt.Errorf("get all droplets for tag %s returned error. Error=%v", clusterTag, err) - } - - instanceGroupName := "" - for _, droplet := range droplets { - doInstanceGroup, err := getDropletInstanceGroup(droplet.Tags) - if err != nil { - return nil, fmt.Errorf("get droplets Instance group for tags %v returned error. Error=%v", droplet.Tags, err) - } - - instanceGroupName = fmt.Sprintf("%s-%s", clusterName, doInstanceGroup) - instanceGroupMap[instanceGroupName] = append(instanceGroupMap[instanceGroupName], strconv.Itoa(droplet.ID)) - - result = append(result, DOInstanceGroup{ - InstanceGroupName: instanceGroupName, - GroupType: instanceGroupName, - ClusterName: clusterName, - Members: instanceGroupMap[instanceGroupName], - }) - } - - klog.V(8).Infof("InstanceGroup Info = %v", result) - - return result, nil -} - -func getDropletInstanceGroup(tags []string) (string, error) { - for _, tag := range tags { - klog.V(8).Infof("Check tag = %s", tag) - if strings.Contains(strings.ToLower(tag), TagKubernetesInstanceGroup) { - tagParts := strings.Split(tag, ":") - if len(tagParts) < 2 { - return "", fmt.Errorf("tag split failed, too few components for tag %q", tag) - } - return tagParts[1], nil - } - } - - return "", fmt.Errorf("Didn't find k8s-instancegroup for tag %v", tags) -} - -// matchInstanceGroup filters a list of instancegroups for recognized cloud groups -func matchInstanceGroup(name string, clusterName string, instancegroups []*kops.InstanceGroup) (*kops.InstanceGroup, error) { - var instancegroup *kops.InstanceGroup - for _, g := range instancegroups { - var groupName string - - switch g.Spec.Role { - case kops.InstanceGroupRoleMaster, kops.InstanceGroupRoleNode: - groupName = clusterName + "-" + g.ObjectMeta.Name - default: - klog.Warningf("Ignoring InstanceGroup of unknown role %q", g.Spec.Role) - continue - } - - if name == groupName { - if instancegroup != nil { - return nil, fmt.Errorf("found multiple instance groups matching servergrp %q", groupName) - } - instancegroup = g - } - } - - return instancegroup, nil -} - -func buildCloudInstanceGroup(c *Cloud, ig *kops.InstanceGroup, g DOInstanceGroup, nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error) { - cg := &cloudinstances.CloudInstanceGroup{ - HumanName: g.InstanceGroupName, - InstanceGroup: ig, - Raw: g, - MinSize: int(fi.Int32Value(ig.Spec.MinSize)), - TargetSize: int(fi.Int32Value(ig.Spec.MinSize)), - MaxSize: int(fi.Int32Value(ig.Spec.MaxSize)), - } - - for _, member := range g.Members { - - // TODO use a hash of the godo.DropletCreateRequest fields to calculate the second parameter. - _, err := cg.NewCloudInstance(member, cloudinstances.CloudInstanceStatusUpToDate, nodeMap[member]) - if err != nil { - return nil, fmt.Errorf("error creating cloud instance group member: %v", err) - } - } - - return cg, nil -} diff --git a/pkg/resources/digitalocean/resources.go b/pkg/resources/digitalocean/resources.go index b5f65cebd6423..48ccfe0a3a249 100644 --- a/pkg/resources/digitalocean/resources.go +++ b/pkg/resources/digitalocean/resources.go @@ -31,6 +31,7 @@ import ( "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/resources" "k8s.io/kops/upup/pkg/fi" + "k8s.io/kops/upup/pkg/fi/cloudup/do" ) const ( @@ -42,7 +43,7 @@ const ( type listFn func(fi.Cloud, string) ([]*resources.Resource, error) -func ListResources(cloud *Cloud, clusterName string) (map[string]*resources.Resource, error) { +func ListResources(cloud do.DOCloud, clusterName string) (map[string]*resources.Resource, error) { resourceTrackers := make(map[string]*resources.Resource) listFunctions := []listFn{ @@ -66,12 +67,12 @@ func ListResources(cloud *Cloud, clusterName string) (map[string]*resources.Reso } func listDroplets(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) { - c := cloud.(*Cloud) + c := cloud.(do.DOCloud) var resourceTrackers []*resources.Resource clusterTag := "KubernetesCluster:" + strings.Replace(clusterName, ".", "-", -1) - droplets, err := getAllDropletsByTag(c, clusterTag) + droplets, err := c.GetAllDropletsByTag(clusterTag) if err != nil { return nil, fmt.Errorf("failed to list droplets: %v", err) } @@ -92,40 +93,13 @@ func listDroplets(cloud fi.Cloud, clusterName string) ([]*resources.Resource, er return resourceTrackers, nil } -func getAllDropletsByTag(cloud *Cloud, tag string) ([]godo.Droplet, error) { - allDroplets := []godo.Droplet{} - - opt := &godo.ListOptions{} - for { - droplets, resp, err := cloud.Droplets().ListByTag(context.TODO(), tag, opt) - if err != nil { - return nil, err - } - - allDroplets = append(allDroplets, droplets...) - - if resp.Links == nil || resp.Links.IsLastPage() { - break - } - - page, err := resp.Links.CurrentPage() - if err != nil { - return nil, err - } - - opt.Page = page + 1 - } - - return allDroplets, nil -} - func listVolumes(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) { - c := cloud.(*Cloud) + c := cloud.(do.DOCloud) var resourceTrackers []*resources.Resource volumeMatch := strings.Replace(clusterName, ".", "-", -1) - volumes, err := getAllVolumesByRegion(c, c.Region()) + volumes, err := c.GetAllVolumesByRegion() if err != nil { return nil, fmt.Errorf("failed to list volumes: %s", err) } @@ -153,41 +127,9 @@ func listVolumes(cloud fi.Cloud, clusterName string) ([]*resources.Resource, err return resourceTrackers, nil } -func getAllVolumesByRegion(cloud *Cloud, region string) ([]godo.Volume, error) { - allVolumes := []godo.Volume{} - - opt := &godo.ListOptions{} - for { - volumes, resp, err := cloud.Volumes().ListVolumes(context.TODO(), &godo.ListVolumeParams{ - Region: region, - ListOptions: opt, - }) - - if err != nil { - return nil, err - } - - allVolumes = append(allVolumes, volumes...) - - if resp.Links == nil || resp.Links.IsLastPage() { - break - } - - page, err := resp.Links.CurrentPage() - if err != nil { - return nil, err - } - - opt.Page = page + 1 - } - - return allVolumes, nil -} - func listDNS(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) { - c := cloud.(*Cloud) - - domains, _, err := c.Client.Domains.List(context.TODO(), &godo.ListOptions{}) + c := cloud.(do.DOCloud) + domains, _, err := c.DomainService().List(context.TODO(), &godo.ListOptions{}) if err != nil { return nil, fmt.Errorf("failed to list domains: %s", err) } @@ -242,12 +184,12 @@ func listDNS(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) return resourceTrackers, nil } -func getAllRecordsByDomain(cloud *Cloud, domain string) ([]godo.DomainRecord, error) { +func getAllRecordsByDomain(cloud do.DOCloud, domain string) ([]godo.DomainRecord, error) { allRecords := []godo.DomainRecord{} opt := &godo.ListOptions{} for { - records, resp, err := cloud.Client.Domains.Records(context.TODO(), domain, opt) + records, resp, err := cloud.DomainService().Records(context.TODO(), domain, opt) if err != nil { return nil, err } @@ -270,12 +212,12 @@ func getAllRecordsByDomain(cloud *Cloud, domain string) ([]godo.DomainRecord, er } func listLoadBalancers(cloud fi.Cloud, clusterName string) ([]*resources.Resource, error) { - c := cloud.(*Cloud) + c := cloud.(do.DOCloud) var resourceTrackers []*resources.Resource clusterTag := "KubernetesCluster-Master:" + strings.Replace(clusterName, ".", "-", -1) - lbs, err := getAllLoadBalancers(c) + lbs, err := c.GetAllLoadBalancers() if err != nil { return nil, fmt.Errorf("failed to list lbs: %v", err) } @@ -303,42 +245,14 @@ func listLoadBalancers(cloud fi.Cloud, clusterName string) ([]*resources.Resourc return resourceTrackers, nil } -func getAllLoadBalancers(cloud *Cloud) ([]godo.LoadBalancer, error) { - allLoadBalancers := []godo.LoadBalancer{} - - opt := &godo.ListOptions{} - for { - lbs, resp, err := cloud.LoadBalancers().List(context.TODO(), opt) - if err != nil { - return nil, err - } - - allLoadBalancers = append(allLoadBalancers, lbs...) - - if resp.Links == nil || resp.Links.IsLastPage() { - break - } - - page, err := resp.Links.CurrentPage() - if err != nil { - return nil, err - } - - opt.Page = page + 1 - } - - return allLoadBalancers, nil -} - func deleteDroplet(cloud fi.Cloud, t *resources.Resource) error { - c := cloud.(*Cloud) - + c := cloud.(do.DOCloud) dropletID, err := strconv.Atoi(t.ID) if err != nil { return fmt.Errorf("failed to convert droplet ID to int: %s", err) } - _, err = c.Droplets().Delete(context.TODO(), dropletID) + _, err = c.DropletsService().Delete(context.TODO(), dropletID) if err != nil { return fmt.Errorf("failed to delete droplet: %d, err: %s", dropletID, err) } @@ -347,11 +261,10 @@ func deleteDroplet(cloud fi.Cloud, t *resources.Resource) error { } func deleteVolume(cloud fi.Cloud, t *resources.Resource) error { - c := cloud.(*Cloud) - + c := cloud.(do.DOCloud) volume := t.Obj.(godo.Volume) for _, dropletID := range volume.DropletIDs { - action, _, err := c.VolumeActions().DetachByDropletID(context.TODO(), volume.ID, dropletID) + action, _, err := c.VolumeActionService().DetachByDropletID(context.TODO(), volume.ID, dropletID) if err != nil { return fmt.Errorf("failed to detach volume: %s, err: %s", volume.ID, err) } @@ -360,7 +273,7 @@ func deleteVolume(cloud fi.Cloud, t *resources.Resource) error { } } - _, err := c.Volumes().DeleteVolume(context.TODO(), t.ID) + _, err := c.VolumeService().DeleteVolume(context.TODO(), t.ID) if err != nil { return fmt.Errorf("failed to delete volume: %s, err: %s", t.ID, err) } @@ -369,10 +282,10 @@ func deleteVolume(cloud fi.Cloud, t *resources.Resource) error { } func deleteRecord(cloud fi.Cloud, domain string, t *resources.Resource) error { - c := cloud.(*Cloud) + c := cloud.(do.DOCloud) record := t.Obj.(godo.DomainRecord) - _, err := c.Client.Domains.DeleteRecord(context.TODO(), domain, record.ID) + _, err := c.DomainService().DeleteRecord(context.TODO(), domain, record.ID) if err != nil { return fmt.Errorf("failed to delete record for domain %s: %d", domain, record.ID) } @@ -381,9 +294,9 @@ func deleteRecord(cloud fi.Cloud, domain string, t *resources.Resource) error { } func deleteLoadBalancer(cloud fi.Cloud, t *resources.Resource) error { - c := cloud.(*Cloud) + c := cloud.(do.DOCloud) lb := t.Obj.(godo.LoadBalancer) - _, err := c.Client.LoadBalancers.Delete(context.TODO(), lb.ID) + _, err := c.LoadBalancersService().Delete(context.TODO(), lb.ID) if err != nil { return fmt.Errorf("failed to delete load balancer with name %s %v", lb.Name, err) @@ -392,7 +305,7 @@ func deleteLoadBalancer(cloud fi.Cloud, t *resources.Resource) error { return nil } -func waitForDetach(cloud *Cloud, action *godo.Action) error { +func waitForDetach(cloud do.DOCloud, action *godo.Action) error { timeout := time.After(10 * time.Second) ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() @@ -401,7 +314,7 @@ func waitForDetach(cloud *Cloud, action *godo.Action) error { case <-timeout: return errors.New("timed out waiting for volume to detach") case <-ticker.C: - updatedAction, _, err := cloud.Client.Actions.Get(context.TODO(), action.ID) + updatedAction, _, err := cloud.ActionsService().Get(context.TODO(), action.ID) if err != nil { return err } diff --git a/pkg/resources/ops/BUILD.bazel b/pkg/resources/ops/BUILD.bazel index 011f55a732e6c..40906d06db828 100644 --- a/pkg/resources/ops/BUILD.bazel +++ b/pkg/resources/ops/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//upup/pkg/fi/cloudup/aliup:go_default_library", "//upup/pkg/fi/cloudup/awsup:go_default_library", "//upup/pkg/fi/cloudup/azure:go_default_library", + "//upup/pkg/fi/cloudup/do:go_default_library", "//upup/pkg/fi/cloudup/gce:go_default_library", "//upup/pkg/fi/cloudup/openstack:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", diff --git a/pkg/resources/ops/collector.go b/pkg/resources/ops/collector.go index 0c5462911bcbb..aafadb83a60e0 100644 --- a/pkg/resources/ops/collector.go +++ b/pkg/resources/ops/collector.go @@ -31,6 +31,7 @@ import ( cloudali "k8s.io/kops/upup/pkg/fi/cloudup/aliup" "k8s.io/kops/upup/pkg/fi/cloudup/awsup" cloudazure "k8s.io/kops/upup/pkg/fi/cloudup/azure" + clouddo "k8s.io/kops/upup/pkg/fi/cloudup/do" cloudgce "k8s.io/kops/upup/pkg/fi/cloudup/gce" cloudopenstack "k8s.io/kops/upup/pkg/fi/cloudup/openstack" ) @@ -42,7 +43,7 @@ func ListResources(cloud fi.Cloud, cluster *kops.Cluster, region string) (map[st case kops.CloudProviderAWS: return aws.ListResourcesAWS(cloud.(awsup.AWSCloud), clusterName) case kops.CloudProviderDO: - return digitalocean.ListResources(cloud.(*digitalocean.Cloud), clusterName) + return digitalocean.ListResources(cloud.(clouddo.DOCloud), clusterName) case kops.CloudProviderGCE: return gce.ListResourcesGCE(cloud.(cloudgce.GCECloud), clusterName, region) case kops.CloudProviderOpenstack: diff --git a/protokube/pkg/gossip/do/BUILD.bazel b/protokube/pkg/gossip/do/BUILD.bazel index 2436f75cd5beb..011c5bc741573 100644 --- a/protokube/pkg/gossip/do/BUILD.bazel +++ b/protokube/pkg/gossip/do/BUILD.bazel @@ -6,8 +6,8 @@ go_library( importpath = "k8s.io/kops/protokube/pkg/gossip/do", visibility = ["//visibility:public"], deps = [ - "//pkg/resources/digitalocean:go_default_library", "//protokube/pkg/gossip:go_default_library", + "//vendor/github.com/digitalocean/godo:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], ) diff --git a/protokube/pkg/gossip/do/seeds.go b/protokube/pkg/gossip/do/seeds.go index 25c4027791231..5d473eca95bbc 100644 --- a/protokube/pkg/gossip/do/seeds.go +++ b/protokube/pkg/gossip/do/seeds.go @@ -21,14 +21,14 @@ import ( "fmt" "strings" + "github.com/digitalocean/godo" "k8s.io/klog/v2" - "k8s.io/kops/pkg/resources/digitalocean" "k8s.io/kops/protokube/pkg/gossip" ) type SeedProvider struct { - cloud *digitalocean.Cloud - tag string + godoClient *godo.Client + tag string } var _ gossip.SeedProvider = &SeedProvider{} @@ -36,7 +36,7 @@ var _ gossip.SeedProvider = &SeedProvider{} func (p *SeedProvider) GetSeeds() ([]string, error) { var seeds []string - droplets, _, err := p.cloud.Droplets().List(context.TODO(), nil) + droplets, _, err := p.godoClient.Droplets.List(context.TODO(), nil) if err != nil { return nil, fmt.Errorf("Droplets.ListByTag returned error: %v", err) @@ -64,11 +64,11 @@ func (p *SeedProvider) GetSeeds() ([]string, error) { return seeds, nil } -func NewSeedProvider(cloud *digitalocean.Cloud, tag string) (*SeedProvider, error) { +func NewSeedProvider(godoClient *godo.Client, tag string) (*SeedProvider, error) { klog.V(4).Infof("Trying new seed provider with cluster tag:%s", tag) return &SeedProvider{ - cloud: cloud, - tag: tag, + godoClient: godoClient, + tag: tag, }, nil } diff --git a/protokube/pkg/protokube/BUILD.bazel b/protokube/pkg/protokube/BUILD.bazel index c191545812d09..50d8916155108 100644 --- a/protokube/pkg/protokube/BUILD.bazel +++ b/protokube/pkg/protokube/BUILD.bazel @@ -32,7 +32,6 @@ go_library( "//pkg/k8scodecs:go_default_library", "//pkg/kubemanifest:go_default_library", "//pkg/nodelabels:go_default_library", - "//pkg/resources/digitalocean:go_default_library", "//protokube/pkg/etcd:go_default_library", "//protokube/pkg/gossip:go_default_library", "//protokube/pkg/gossip/ali:go_default_library", @@ -63,6 +62,7 @@ go_library( "//vendor/github.com/digitalocean/godo:go_default_library", "//vendor/github.com/gophercloud/gophercloud/openstack/blockstorage/v3/volumes:go_default_library", "//vendor/github.com/gophercloud/gophercloud/openstack/compute/v2/extensions/volumeattach:go_default_library", + "//vendor/golang.org/x/oauth2:go_default_library", "//vendor/google.golang.org/api/compute/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/rbac/v1beta1:go_default_library", diff --git a/protokube/pkg/protokube/do_volume.go b/protokube/pkg/protokube/do_volume.go index ef5713baf217d..f6085f534a5e8 100644 --- a/protokube/pkg/protokube/do_volume.go +++ b/protokube/pkg/protokube/do_volume.go @@ -18,6 +18,7 @@ package protokube import ( "context" + "errors" "fmt" "io/ioutil" "net" @@ -28,8 +29,8 @@ import ( "time" "github.com/digitalocean/godo" + "golang.org/x/oauth2" - "k8s.io/kops/pkg/resources/digitalocean" "k8s.io/kops/protokube/pkg/etcd" "k8s.io/kops/protokube/pkg/gossip" gossipdo "k8s.io/kops/protokube/pkg/gossip/do" @@ -44,9 +45,14 @@ const ( localDevicePrefix = "/dev/disk/by-id/scsi-0DO_Volume_" ) +// TokenSource implements oauth2.TokenSource +type TokenSource struct { + AccessToken string +} + type DOVolumes struct { - ClusterID string - Cloud *digitalocean.Cloud + ClusterID string + godoClient *godo.Client region string dropletName string @@ -103,7 +109,7 @@ func NewDOVolumes() (*DOVolumes, error) { return nil, fmt.Errorf("failed to get droplet name: %s", err) } - cloud, err := digitalocean.NewCloud(region) + godoClient, err := NewDOCloud() if err != nil { return nil, fmt.Errorf("failed to initialize digitalocean cloud: %s", err) } @@ -119,7 +125,7 @@ func NewDOVolumes() (*DOVolumes, error) { } return &DOVolumes{ - Cloud: cloud, + godoClient: godoClient, ClusterID: clusterID, dropletID: dropletIDInt, dropletName: dropletName, @@ -128,9 +134,33 @@ func NewDOVolumes() (*DOVolumes, error) { }, nil } +// Token() returns oauth2.Token +func (t *TokenSource) Token() (*oauth2.Token, error) { + token := &oauth2.Token{ + AccessToken: t.AccessToken, + } + return token, nil +} + +func NewDOCloud() (*godo.Client, error) { + accessToken := os.Getenv("DIGITALOCEAN_ACCESS_TOKEN") + if accessToken == "" { + return nil, errors.New("DIGITALOCEAN_ACCESS_TOKEN is required") + } + + tokenSource := &TokenSource{ + AccessToken: accessToken, + } + + oauthClient := oauth2.NewClient(context.TODO(), tokenSource) + client := godo.NewClient(oauthClient) + + return client, nil +} + func (d *DOVolumes) AttachVolume(volume *Volume) error { for { - action, _, err := d.Cloud.VolumeActions().Attach(context.TODO(), volume.ID, d.dropletID) + action, _, err := d.godoClient.StorageActions.Attach(context.TODO(), volume.ID, d.dropletID) if err != nil { return fmt.Errorf("error attaching volume: %s", err) } @@ -158,7 +188,7 @@ func (d *DOVolumes) AttachVolume(volume *Volume) error { } func (d *DOVolumes) FindVolumes() ([]*Volume, error) { - doVolumes, err := getAllVolumesByRegion(d.Cloud, d.region) + doVolumes, err := getAllVolumesByRegion(d.godoClient, d.region) if err != nil { return nil, fmt.Errorf("failed to list volumes: %s", err) } @@ -195,12 +225,12 @@ func (d *DOVolumes) FindVolumes() ([]*Volume, error) { return volumes, nil } -func getAllVolumesByRegion(cloud *digitalocean.Cloud, region string) ([]godo.Volume, error) { +func getAllVolumesByRegion(godoClient *godo.Client, region string) ([]godo.Volume, error) { allVolumes := []godo.Volume{} opt := &godo.ListOptions{} for { - volumes, resp, err := cloud.Volumes().ListVolumes(context.TODO(), &godo.ListVolumeParams{ + volumes, resp, err := godoClient.Storage.ListVolumes(context.TODO(), &godo.ListVolumeParams{ Region: region, ListOptions: opt, }) @@ -243,7 +273,7 @@ func (d *DOVolumes) FindMountedVolume(volume *Volume) (string, error) { } func (d *DOVolumes) getVolumeByID(id string) (*godo.Volume, error) { - vol, _, err := d.Cloud.Volumes().GetVolume(context.TODO(), id) + vol, _, err := d.godoClient.Storage.GetVolume(context.TODO(), id) return vol, err } @@ -280,7 +310,7 @@ func getLocalDeviceName(vol *godo.Volume) string { func (d *DOVolumes) GossipSeeds() (gossip.SeedProvider, error) { for _, dropletTag := range d.dropletTags { if strings.Contains(dropletTag, strings.Replace(d.ClusterID, ".", "-", -1)) { - return gossipdo.NewSeedProvider(d.Cloud, dropletTag) + return gossipdo.NewSeedProvider(d.godoClient, dropletTag) } } diff --git a/upup/pkg/fi/cloudup/BUILD.bazel b/upup/pkg/fi/cloudup/BUILD.bazel index 727ecf84559a3..d53a2d41c4dc2 100644 --- a/upup/pkg/fi/cloudup/BUILD.bazel +++ b/upup/pkg/fi/cloudup/BUILD.bazel @@ -53,7 +53,6 @@ go_library( "//pkg/model/gcemodel:go_default_library", "//pkg/model/iam:go_default_library", "//pkg/model/openstackmodel:go_default_library", - "//pkg/resources/digitalocean:go_default_library", "//pkg/resources/spotinst:go_default_library", "//pkg/templates:go_default_library", "//pkg/util/subnet:go_default_library", diff --git a/upup/pkg/fi/cloudup/apply_cluster.go b/upup/pkg/fi/cloudup/apply_cluster.go index e0e2c689c6e6a..ba30cef61180c 100644 --- a/upup/pkg/fi/cloudup/apply_cluster.go +++ b/upup/pkg/fi/cloudup/apply_cluster.go @@ -53,7 +53,6 @@ import ( "k8s.io/kops/pkg/model/gcemodel" "k8s.io/kops/pkg/model/iam" "k8s.io/kops/pkg/model/openstackmodel" - "k8s.io/kops/pkg/resources/digitalocean" "k8s.io/kops/pkg/templates" "k8s.io/kops/pkg/wellknownports" "k8s.io/kops/upup/models" @@ -681,7 +680,7 @@ func (c *ApplyClusterCmd) Run(ctx context.Context) error { case kops.CloudProviderAWS: target = awsup.NewAWSAPITarget(cloud.(awsup.AWSCloud)) case kops.CloudProviderDO: - target = do.NewDOAPITarget(cloud.(*digitalocean.Cloud)) + target = do.NewDOAPITarget(cloud.(do.DOCloud)) case kops.CloudProviderOpenstack: target = openstack.NewOpenstackAPITarget(cloud.(openstack.OpenstackCloud)) case kops.CloudProviderALI: diff --git a/upup/pkg/fi/cloudup/do/BUILD.bazel b/upup/pkg/fi/cloudup/do/BUILD.bazel index 73c2e71491491..47ada11d02c01 100644 --- a/upup/pkg/fi/cloudup/do/BUILD.bazel +++ b/upup/pkg/fi/cloudup/do/BUILD.bazel @@ -5,11 +5,21 @@ go_library( srcs = [ "api_target.go", "cloud.go", + "mock_do_cloud.go", + "utils.go", ], importpath = "k8s.io/kops/upup/pkg/fi/cloudup/do", visibility = ["//visibility:public"], deps = [ - "//pkg/resources/digitalocean:go_default_library", + "//dnsprovider/pkg/dnsprovider:go_default_library", + "//pkg/apis/kops:go_default_library", + "//pkg/cloudinstances:go_default_library", + "//pkg/resources/digitalocean/dns:go_default_library", + "//protokube/pkg/etcd:go_default_library", "//upup/pkg/fi:go_default_library", + "//vendor/github.com/digitalocean/godo:go_default_library", + "//vendor/golang.org/x/oauth2:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/klog/v2:go_default_library", ], ) diff --git a/upup/pkg/fi/cloudup/do/api_target.go b/upup/pkg/fi/cloudup/do/api_target.go index 4ecf93b1cc904..10557c0811128 100644 --- a/upup/pkg/fi/cloudup/do/api_target.go +++ b/upup/pkg/fi/cloudup/do/api_target.go @@ -17,17 +17,16 @@ limitations under the License. package do import ( - "k8s.io/kops/pkg/resources/digitalocean" "k8s.io/kops/upup/pkg/fi" ) type DOAPITarget struct { - Cloud *digitalocean.Cloud + Cloud DOCloud } var _ fi.Target = &DOAPITarget{} -func NewDOAPITarget(cloud *digitalocean.Cloud) *DOAPITarget { +func NewDOAPITarget(cloud DOCloud) *DOAPITarget { return &DOAPITarget{ Cloud: cloud, } diff --git a/upup/pkg/fi/cloudup/do/cloud.go b/upup/pkg/fi/cloudup/do/cloud.go index 1fa57a677ea66..2287f15388d13 100644 --- a/upup/pkg/fi/cloudup/do/cloud.go +++ b/upup/pkg/fi/cloudup/do/cloud.go @@ -17,9 +17,23 @@ limitations under the License. package do import ( + "context" + "errors" + "fmt" + "os" + "strconv" "strings" + "time" - "k8s.io/kops/pkg/resources/digitalocean" + "github.com/digitalocean/godo" + "golang.org/x/oauth2" + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kops/dnsprovider/pkg/dnsprovider" + "k8s.io/kops/pkg/apis/kops" + "k8s.io/kops/pkg/cloudinstances" + "k8s.io/kops/pkg/resources/digitalocean/dns" + "k8s.io/kops/protokube/pkg/etcd" "k8s.io/kops/upup/pkg/fi" ) @@ -28,13 +42,524 @@ const TagNameEtcdClusterPrefix = "etcdCluster-" const TagNameRolePrefix = "k8s.io/role/" const TagKubernetesClusterNamePrefix = "KubernetesCluster" const TagKubernetesClusterMasterPrefix = "KubernetesCluster-Master" +const TagKubernetesInstanceGroup = "kops-instancegroup" -func SafeClusterName(clusterName string) string { - // DO does not support . in tags / names - safeClusterName := strings.Replace(clusterName, ".", "-", -1) - return safeClusterName +type DOInstanceGroup struct { + ClusterName string + InstanceGroupName string + GroupType string // will be either "master" or "worker" + Members []string // will store the droplet names that matches. } -func NewDOCloud(region string) (fi.Cloud, error) { - return digitalocean.NewCloud(region) +// TokenSource implements oauth2.TokenSource +type TokenSource struct { + AccessToken string +} + +// DOCloud exposes all the interfaces required to operate on DigitalOcean resources +type DOCloud interface { + fi.Cloud + DropletsService() godo.DropletsService + DropletActionService() godo.DropletActionsService + VolumeService() godo.StorageService + VolumeActionService() godo.StorageActionsService + LoadBalancersService() godo.LoadBalancersService + DomainService() godo.DomainsService + ActionsService() godo.ActionsService + FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) + GetAllLoadBalancers() ([]godo.LoadBalancer, error) + GetAllDropletsByTag(tag string) ([]godo.Droplet, error) + GetAllVolumesByRegion() ([]godo.Volume, error) +} + +// static compile time check to validate DOCloud's fi.Cloud Interface. +var _ fi.Cloud = &doCloudImplementation{} + +// doCloudImplementation holds the godo client object to interact with DO resources. +type doCloudImplementation struct { + Client *godo.Client + + dns dnsprovider.Interface + + // region holds the DO region. + region string +} + +// Token() returns oauth2.Token +func (t *TokenSource) Token() (*oauth2.Token, error) { + token := &oauth2.Token{ + AccessToken: t.AccessToken, + } + return token, nil +} + +// NewCloud returns a Cloud, expecting the env var DIGITALOCEAN_ACCESS_TOKEN +// NewCloud will return an err if DIGITALOCEAN_ACCESS_TOKEN is not defined +func NewDOCloud(region string) (DOCloud, error) { + accessToken := os.Getenv("DIGITALOCEAN_ACCESS_TOKEN") + if accessToken == "" { + return nil, errors.New("DIGITALOCEAN_ACCESS_TOKEN is required") + } + + tokenSource := &TokenSource{ + AccessToken: accessToken, + } + + oauthClient := oauth2.NewClient(context.TODO(), tokenSource) + client := godo.NewClient(oauthClient) + + return &doCloudImplementation{ + Client: client, + dns: dns.NewProvider(client), + region: region, + }, nil +} + +func (c *doCloudImplementation) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) { + return getCloudGroups(c, cluster, instancegroups, warnUnmatched, nodes) +} + +// DeleteGroup is not implemented yet, is a func that needs to delete a DO instance group. +func (c *doCloudImplementation) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error { + klog.V(8).Info("digitalocean cloud provider DeleteGroup not implemented yet") + return fmt.Errorf("digital ocean cloud provider does not support deleting cloud groups at this time") +} + +func (c *doCloudImplementation) DeleteInstance(i *cloudinstances.CloudInstance) error { + dropletID, err := strconv.Atoi(i.ID) + if err != nil { + return fmt.Errorf("failed to convert droplet ID to int: %s", err) + } + + _, _, err = c.Client.DropletActions.Shutdown(context.TODO(), dropletID) + if err != nil { + return fmt.Errorf("error stopping instance %q: %v", dropletID, err) + } + + // Wait for 5 min to stop the instance + for i := 0; i < 5; i++ { + droplet, _, err := c.Client.Droplets.Get(context.TODO(), dropletID) + if err != nil { + return fmt.Errorf("error describing instance %q: %v", dropletID, err) + } + + klog.V(8).Infof("stopping DO instance %q, current Status: %q", droplet, droplet.Status) + + if droplet.Status == "off" { + break + } + + if i == 5 { + return fmt.Errorf("fail to stop DO instance %v in 5 mins", dropletID) + } + + time.Sleep(time.Minute * 1) + } + + _, err = c.Client.Droplets.Delete(context.TODO(), dropletID) + if err != nil { + return fmt.Errorf("error stopping instance %q: %v", dropletID, err) + } + + klog.V(8).Infof("deleted droplet instance %q", dropletID) + + return nil +} + +// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits. +func (c *doCloudImplementation) DetachInstance(i *cloudinstances.CloudInstance) error { + klog.V(8).Info("digitalocean cloud provider DetachInstance not implemented yet") + return fmt.Errorf("digital ocean cloud provider does not support surging") +} + +// ProviderID returns the kops api identifier for DigitalOcean cloud provider +func (c *doCloudImplementation) ProviderID() kops.CloudProviderID { + return kops.CloudProviderDO +} + +// Region returns the DO region we will target +func (c *doCloudImplementation) Region() string { + return c.region +} + +func (c *doCloudImplementation) DNS() (dnsprovider.Interface, error) { + return c.dns, nil +} + +// Volumes returns an implementation of godo.StorageService +func (c *doCloudImplementation) VolumeService() godo.StorageService { + return c.Client.Storage +} + +// VolumeActions returns an implementation of godo.StorageActionsService +func (c *doCloudImplementation) VolumeActionService() godo.StorageActionsService { + return c.Client.StorageActions +} + +// DropletsService returns the droplets client interface. +func (c *doCloudImplementation) DropletsService() godo.DropletsService { + return c.Client.Droplets +} + +func (c *doCloudImplementation) DropletActionService() godo.DropletActionsService { + return c.Client.DropletActions +} + +func (c *doCloudImplementation) LoadBalancersService() godo.LoadBalancersService { + return c.Client.LoadBalancers +} + +func (c *doCloudImplementation) DomainService() godo.DomainsService { + return c.Client.Domains +} + +func (c *doCloudImplementation) ActionsService() godo.ActionsService { + return c.Client.Actions +} + +// FindVPCInfo is not implemented, it's only here to satisfy the fi.Cloud interface +func (c *doCloudImplementation) FindVPCInfo(id string) (*fi.VPCInfo, error) { + return nil, errors.New("not implemented") +} + +func (c *doCloudImplementation) GetApiIngressStatus(cluster *kops.Cluster) ([]fi.ApiIngressStatus, error) { + var ingresses []fi.ApiIngressStatus + if cluster.Spec.MasterPublicName != "" { + // Note that this must match Digital Ocean's lb name + klog.V(2).Infof("Querying DO to find Loadbalancers for API (%q)", cluster.Name) + + loadBalancers, err := c.GetAllLoadBalancers() + if err != nil { + return nil, fmt.Errorf("LoadBalancers.List returned error: %v", err) + } + + lbName := "api-" + strings.Replace(cluster.Name, ".", "-", -1) + + for _, lb := range loadBalancers { + if lb.Name == lbName { + klog.V(10).Infof("Matching LB name found for API (%q)", cluster.Name) + + if lb.Status != "active" { + return nil, fmt.Errorf("load-balancer is not yet active (current status: %s)", lb.Status) + } + + address := lb.IP + ingresses = append(ingresses, fi.ApiIngressStatus{IP: address}) + + return ingresses, nil + } + } + } + + return nil, nil +} + +// FindClusterStatus discovers the status of the cluster, by looking for the tagged etcd volumes +func (c *doCloudImplementation) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) { + etcdStatus, err := findEtcdStatus(c, cluster) + if err != nil { + return nil, err + } + status := &kops.ClusterStatus{ + EtcdClusters: etcdStatus, + } + klog.V(2).Infof("Cluster status (from cloud): %v", fi.DebugAsJsonString(status)) + return status, nil +} + +// findEtcdStatus discovers the status of etcd, by looking for the tagged etcd volumes +func findEtcdStatus(c *doCloudImplementation, cluster *kops.Cluster) ([]kops.EtcdClusterStatus, error) { + statusMap := make(map[string]*kops.EtcdClusterStatus) + volumes, err := c.GetAllVolumesByRegion() + + if err != nil { + return nil, fmt.Errorf("failed to get all volumes by region from %s: %v", c.Region(), err) + } + + for _, volume := range volumes { + volumeID := volume.ID + + etcdClusterName := "" + var etcdClusterSpec *etcd.EtcdClusterSpec + + for _, myTag := range volume.Tags { + klog.V(8).Infof("findEtcdStatus status (from cloud): checking if volume with tag %q belongs to cluster", myTag) + // check if volume belongs to this cluster. + // tag will be in the format "KubernetesCluster:dev5-k8s-local" (where clusterName is dev5.k8s.local) + clusterName := strings.Replace(cluster.Name, ".", "-", -1) + if strings.Contains(myTag, fmt.Sprintf("%s:%s", TagKubernetesClusterNamePrefix, clusterName)) { + klog.V(10).Infof("findEtcdStatus cluster comparison matched for tag: %v", myTag) + // this volume belongs to our cluster, add this to our etcdClusterSpec. + // loop through the tags again and + for _, volumeTag := range volume.Tags { + if strings.Contains(volumeTag, TagKubernetesClusterIndex) { + volumeTagParts := strings.Split(volumeTag, ":") + if len(volumeTagParts) < 2 { + return nil, fmt.Errorf("volume tag split failed, too few components for tag %q on volume %q", volumeTag, volume) + } + dropletIndex := volumeTagParts[1] + etcdClusterSpec, err = c.getEtcdClusterSpec(volume.Name, dropletIndex) + if err != nil { + return nil, fmt.Errorf("error parsing etcd cluster tag %q on volume %q: %v", volumeTag, volumeID, err) + } + + klog.V(10).Infof("findEtcdStatus etcdClusterSpec: %v", fi.DebugAsJsonString(etcdClusterSpec)) + etcdClusterName = etcdClusterSpec.ClusterKey + status := statusMap[etcdClusterName] + if status == nil { + status = &kops.EtcdClusterStatus{ + Name: etcdClusterName, + } + statusMap[etcdClusterName] = status + } + + memberName := etcdClusterSpec.NodeName + status.Members = append(status.Members, &kops.EtcdMemberStatus{ + Name: memberName, + VolumeId: volume.ID, + }) + } + } + } + } + } + + status := make([]kops.EtcdClusterStatus, 0, len(statusMap)) + for _, v := range statusMap { + status = append(status, *v) + } + + return status, nil +} + +func (c *doCloudImplementation) getEtcdClusterSpec(volumeName string, dropletName string) (*etcd.EtcdClusterSpec, error) { + var clusterKey string + if strings.Contains(volumeName, "etcd-main") { + clusterKey = "main" + } else if strings.Contains(volumeName, "etcd-events") { + clusterKey = "events" + } else { + return nil, fmt.Errorf("could not determine etcd cluster type for volume: %s", volumeName) + } + + return &etcd.EtcdClusterSpec{ + ClusterKey: clusterKey, + NodeName: dropletName, + NodeNames: []string{dropletName}, + }, nil +} + +func getCloudGroups(c *doCloudImplementation, cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) { + nodeMap := cloudinstances.GetNodeMap(nodes, cluster) + + groups := make(map[string]*cloudinstances.CloudInstanceGroup) + instanceGroups, err := findInstanceGroups(c, cluster.ObjectMeta.Name) + if err != nil { + return nil, fmt.Errorf("unable to find autoscale groups: %v", err) + } + + for _, doGroup := range instanceGroups { + name := doGroup.InstanceGroupName + + instancegroup, err := matchInstanceGroup(name, cluster.ObjectMeta.Name, instancegroups) + if err != nil { + return nil, fmt.Errorf("error getting instance group for doGroup %q", name) + } + if instancegroup == nil { + if warnUnmatched { + klog.Warningf("Found doGroup with no corresponding instance group %q", name) + } + continue + } + + groups[instancegroup.ObjectMeta.Name], err = buildCloudInstanceGroup(c, instancegroup, doGroup, nodeMap) + if err != nil { + return nil, fmt.Errorf("error getting cloud instance group %q: %v", instancegroup.ObjectMeta.Name, err) + } + } + + klog.V(8).Infof("Cloud Instance Group Info = %v", groups) + return groups, nil + +} + +// findInstanceGroups finds instance groups matching the specified tags +func findInstanceGroups(c *doCloudImplementation, clusterName string) ([]DOInstanceGroup, error) { + var result []DOInstanceGroup + instanceGroupMap := make(map[string][]string) // map of instance group name with droplet ids + + clusterTag := "KubernetesCluster:" + strings.Replace(clusterName, ".", "-", -1) + droplets, err := c.GetAllDropletsByTag(clusterTag) + if err != nil { + return nil, fmt.Errorf("get all droplets for tag %s returned error. Error=%v", clusterTag, err) + } + + instanceGroupName := "" + for _, droplet := range droplets { + doInstanceGroup, err := getDropletInstanceGroup(droplet.Tags) + if err != nil { + return nil, fmt.Errorf("get droplets Instance group for tags %v returned error. Error=%v", droplet.Tags, err) + } + + instanceGroupName = fmt.Sprintf("%s-%s", clusterName, doInstanceGroup) + instanceGroupMap[instanceGroupName] = append(instanceGroupMap[instanceGroupName], strconv.Itoa(droplet.ID)) + + result = append(result, DOInstanceGroup{ + InstanceGroupName: instanceGroupName, + GroupType: instanceGroupName, + ClusterName: clusterName, + Members: instanceGroupMap[instanceGroupName], + }) + } + + klog.V(8).Infof("InstanceGroup Info = %v", result) + + return result, nil +} + +func getDropletInstanceGroup(tags []string) (string, error) { + for _, tag := range tags { + klog.V(8).Infof("Check tag = %s", tag) + if strings.Contains(strings.ToLower(tag), TagKubernetesInstanceGroup) { + tagParts := strings.Split(tag, ":") + if len(tagParts) < 2 { + return "", fmt.Errorf("tag split failed, too few components for tag %q", tag) + } + return tagParts[1], nil + } + } + + return "", fmt.Errorf("Didn't find k8s-instancegroup for tag %v", tags) +} + +// matchInstanceGroup filters a list of instancegroups for recognized cloud groups +func matchInstanceGroup(name string, clusterName string, instancegroups []*kops.InstanceGroup) (*kops.InstanceGroup, error) { + var instancegroup *kops.InstanceGroup + for _, g := range instancegroups { + var groupName string + + switch g.Spec.Role { + case kops.InstanceGroupRoleMaster, kops.InstanceGroupRoleNode: + groupName = clusterName + "-" + g.ObjectMeta.Name + default: + klog.Warningf("Ignoring InstanceGroup of unknown role %q", g.Spec.Role) + continue + } + + if name == groupName { + if instancegroup != nil { + return nil, fmt.Errorf("found multiple instance groups matching servergrp %q", groupName) + } + instancegroup = g + } + } + + return instancegroup, nil +} + +func buildCloudInstanceGroup(c *doCloudImplementation, ig *kops.InstanceGroup, g DOInstanceGroup, nodeMap map[string]*v1.Node) (*cloudinstances.CloudInstanceGroup, error) { + cg := &cloudinstances.CloudInstanceGroup{ + HumanName: g.InstanceGroupName, + InstanceGroup: ig, + Raw: g, + MinSize: int(fi.Int32Value(ig.Spec.MinSize)), + TargetSize: int(fi.Int32Value(ig.Spec.MinSize)), + MaxSize: int(fi.Int32Value(ig.Spec.MaxSize)), + } + + for _, member := range g.Members { + + // TODO use a hash of the godo.DropletCreateRequest fields to calculate the second parameter. + _, err := cg.NewCloudInstance(member, cloudinstances.CloudInstanceStatusUpToDate, nodeMap[member]) + if err != nil { + return nil, fmt.Errorf("error creating cloud instance group member: %v", err) + } + } + + return cg, nil +} + +func (c *doCloudImplementation) GetAllLoadBalancers() ([]godo.LoadBalancer, error) { + allLoadBalancers := []godo.LoadBalancer{} + + opt := &godo.ListOptions{} + for { + lbs, resp, err := c.LoadBalancersService().List(context.TODO(), opt) + if err != nil { + return nil, err + } + + allLoadBalancers = append(allLoadBalancers, lbs...) + + if resp.Links == nil || resp.Links.IsLastPage() { + break + } + + page, err := resp.Links.CurrentPage() + if err != nil { + return nil, err + } + + opt.Page = page + 1 + } + + return allLoadBalancers, nil +} + +func (c *doCloudImplementation) GetAllDropletsByTag(tag string) ([]godo.Droplet, error) { + allDroplets := []godo.Droplet{} + + opt := &godo.ListOptions{} + for { + droplets, resp, err := c.DropletsService().ListByTag(context.TODO(), tag, opt) + if err != nil { + return nil, err + } + + allDroplets = append(allDroplets, droplets...) + + if resp.Links == nil || resp.Links.IsLastPage() { + break + } + + page, err := resp.Links.CurrentPage() + if err != nil { + return nil, err + } + + opt.Page = page + 1 + } + + return allDroplets, nil +} + +func (c *doCloudImplementation) GetAllVolumesByRegion() ([]godo.Volume, error) { + allVolumes := []godo.Volume{} + + opt := &godo.ListOptions{} + for { + volumes, resp, err := c.VolumeService().ListVolumes(context.TODO(), &godo.ListVolumeParams{ + Region: c.Region(), + ListOptions: opt, + }) + + if err != nil { + return nil, err + } + + allVolumes = append(allVolumes, volumes...) + + if resp.Links == nil || resp.Links.IsLastPage() { + break + } + + page, err := resp.Links.CurrentPage() + if err != nil { + return nil, err + } + + opt.Page = page + 1 + } + + return allVolumes, nil } diff --git a/upup/pkg/fi/cloudup/do/mock_do_cloud.go b/upup/pkg/fi/cloudup/do/mock_do_cloud.go new file mode 100644 index 0000000000000..2f12969cd9e7c --- /dev/null +++ b/upup/pkg/fi/cloudup/do/mock_do_cloud.go @@ -0,0 +1,127 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package do + +import ( + "errors" + "fmt" + + "github.com/digitalocean/godo" + v1 "k8s.io/api/core/v1" + "k8s.io/kops/dnsprovider/pkg/dnsprovider" + "k8s.io/kops/pkg/apis/kops" + "k8s.io/kops/pkg/cloudinstances" + "k8s.io/kops/pkg/resources/digitalocean/dns" + "k8s.io/kops/upup/pkg/fi" +) + +var _ fi.Cloud = (*doCloudMockImplementation)(nil) + +type doCloudMockImplementation struct { + Client *godo.Client + + region string +} + +func BuildMockDOCloud(region string) *doCloudMockImplementation { + return &doCloudMockImplementation{region: region, Client: godo.NewClient(nil)} +} + +func (c *doCloudMockImplementation) ProviderID() kops.CloudProviderID { + return kops.CloudProviderDO +} + +// Region returns the DO region we will target +func (c *doCloudMockImplementation) Region() string { + return c.region +} + +func (c *doCloudMockImplementation) DNS() (dnsprovider.Interface, error) { + provider := dns.NewProvider(c.Client) + return provider, nil +} + +// FindVPCInfo is not implemented, it's only here to satisfy the fi.Cloud interface +func (c *doCloudMockImplementation) FindVPCInfo(id string) (*fi.VPCInfo, error) { + return nil, errors.New("not implemented") +} + +// DeleteGroup is not implemented yet, is a func that needs to delete a DO instance group. +func (c *doCloudMockImplementation) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error { + return fmt.Errorf("digital ocean cloud provider does not support deleting cloud groups at this time") +} + +func (c *doCloudMockImplementation) DeleteInstance(instance *cloudinstances.CloudInstance) error { + return errors.New("not tested") +} + +// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits. +func (c *doCloudMockImplementation) DetachInstance(i *cloudinstances.CloudInstance) error { + return fmt.Errorf("digital ocean cloud provider does not support surging") +} + +func (c *doCloudMockImplementation) GetCloudGroups(cluster *kops.Cluster, instancegroups []*kops.InstanceGroup, warnUnmatched bool, nodes []v1.Node) (map[string]*cloudinstances.CloudInstanceGroup, error) { + return nil, errors.New("not tested") +} + +// FindClusterStatus discovers the status of the cluster, by inspecting the cloud objects +func (c *doCloudMockImplementation) FindClusterStatus(cluster *kops.Cluster) (*kops.ClusterStatus, error) { + return nil, errors.New("not tested") +} + +func (c *doCloudMockImplementation) GetApiIngressStatus(cluster *kops.Cluster) ([]fi.ApiIngressStatus, error) { + return nil, errors.New("not tested") +} + +func (c *doCloudMockImplementation) DropletsService() godo.DropletsService { + return c.Client.Droplets +} + +func (c *doCloudMockImplementation) DropletActionService() godo.DropletActionsService { + return c.Client.DropletActions +} + +func (c *doCloudMockImplementation) VolumeService() godo.StorageService { + return c.Client.Storage +} + +func (c *doCloudMockImplementation) VolumeActionService() godo.StorageActionsService { + return c.Client.StorageActions +} + +func (c *doCloudMockImplementation) LoadBalancersService() godo.LoadBalancersService { + return c.Client.LoadBalancers +} + +func (c *doCloudMockImplementation) DomainService() godo.DomainsService { + return c.Client.Domains +} + +func (c *doCloudMockImplementation) ActionsService() godo.ActionsService { + return c.Client.Actions +} + +func (c *doCloudMockImplementation) GetAllLoadBalancers() ([]godo.LoadBalancer, error) { + return nil, nil +} + +func (c *doCloudMockImplementation) GetAllDropletsByTag(tag string) ([]godo.Droplet, error) { + return nil, nil +} +func (c *doCloudMockImplementation) GetAllVolumesByRegion() ([]godo.Volume, error) { + return nil, nil +} diff --git a/upup/pkg/fi/cloudup/do/utils.go b/upup/pkg/fi/cloudup/do/utils.go new file mode 100644 index 0000000000000..31c9c53e1227d --- /dev/null +++ b/upup/pkg/fi/cloudup/do/utils.go @@ -0,0 +1,25 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package do + +import "strings" + +func SafeClusterName(clusterName string) string { + // DO does not support . in tags / names + safeClusterName := strings.Replace(clusterName, ".", "-", -1) + return safeClusterName +} diff --git a/upup/pkg/fi/cloudup/dotasks/BUILD.bazel b/upup/pkg/fi/cloudup/dotasks/BUILD.bazel index e64264781ed4c..e5dd1a71485bc 100644 --- a/upup/pkg/fi/cloudup/dotasks/BUILD.bazel +++ b/upup/pkg/fi/cloudup/dotasks/BUILD.bazel @@ -13,7 +13,6 @@ go_library( importpath = "k8s.io/kops/upup/pkg/fi/cloudup/dotasks", visibility = ["//visibility:public"], deps = [ - "//pkg/resources/digitalocean:go_default_library", "//upup/pkg/fi:go_default_library", "//upup/pkg/fi/cloudup/do:go_default_library", "//upup/pkg/fi/cloudup/terraform:go_default_library", @@ -27,8 +26,8 @@ go_test( srcs = ["volume_test.go"], embed = [":go_default_library"], deps = [ - "//pkg/resources/digitalocean:go_default_library", "//upup/pkg/fi:go_default_library", + "//upup/pkg/fi/cloudup/do:go_default_library", "//vendor/github.com/digitalocean/godo:go_default_library", ], ) diff --git a/upup/pkg/fi/cloudup/dotasks/droplet.go b/upup/pkg/fi/cloudup/dotasks/droplet.go index b983462a9ff1e..252c247ca8921 100644 --- a/upup/pkg/fi/cloudup/dotasks/droplet.go +++ b/upup/pkg/fi/cloudup/dotasks/droplet.go @@ -23,7 +23,6 @@ import ( "github.com/digitalocean/godo" "k8s.io/klog/v2" - "k8s.io/kops/pkg/resources/digitalocean" "k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi/cloudup/do" _ "k8s.io/kops/upup/pkg/fi/cloudup/terraform" @@ -53,7 +52,7 @@ func (d *Droplet) CompareWithID() *string { } func (d *Droplet) Find(c *fi.Context) (*Droplet, error) { - cloud := c.Cloud.(*digitalocean.Cloud) + cloud := c.Cloud.(do.DOCloud) droplets, err := listDroplets(cloud) if err != nil { @@ -88,12 +87,12 @@ func (d *Droplet) Find(c *fi.Context) (*Droplet, error) { }, nil } -func listDroplets(cloud *digitalocean.Cloud) ([]godo.Droplet, error) { +func listDroplets(cloud do.DOCloud) ([]godo.Droplet, error) { allDroplets := []godo.Droplet{} opt := &godo.ListOptions{} for { - droplets, resp, err := cloud.Droplets().List(context.TODO(), opt) + droplets, resp, err := cloud.DropletsService().List(context.TODO(), opt) if err != nil { return nil, err } @@ -145,7 +144,7 @@ func (_ *Droplet) RenderDO(t *do.DOAPITarget, a, e, changes *Droplet) error { } for i := 0; i < newDropletCount; i++ { - _, _, err = t.Cloud.Droplets().Create(context.TODO(), &godo.DropletCreateRequest{ + _, _, err = t.Cloud.DropletsService().Create(context.TODO(), &godo.DropletCreateRequest{ Name: fi.StringValue(e.Name), Region: fi.StringValue(e.Region), Size: fi.StringValue(e.Size), diff --git a/upup/pkg/fi/cloudup/dotasks/loadbalancer.go b/upup/pkg/fi/cloudup/dotasks/loadbalancer.go index fe3658020ad19..8e13550fd7c12 100644 --- a/upup/pkg/fi/cloudup/dotasks/loadbalancer.go +++ b/upup/pkg/fi/cloudup/dotasks/loadbalancer.go @@ -27,7 +27,6 @@ import ( "github.com/digitalocean/godo" "k8s.io/klog/v2" - "k8s.io/kops/pkg/resources/digitalocean" "k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi/cloudup/do" ) @@ -58,8 +57,8 @@ func (lb *LoadBalancer) Find(c *fi.Context) (*LoadBalancer, error) { return nil, nil } - cloud := c.Cloud.(*digitalocean.Cloud) - lbService := cloud.LoadBalancers() + cloud := c.Cloud.(do.DOCloud) + lbService := cloud.LoadBalancersService() loadbalancer, _, err := lbService.Get(context.TODO(), fi.StringValue(lb.ID)) if err != nil { @@ -150,7 +149,7 @@ func (_ *LoadBalancer) RenderDO(t *do.DOAPITarget, a, e, changes *LoadBalancer) // load balancer doesn't exist. Create one. klog.V(10).Infof("Creating load balancer for DO") - loadBalancerService := t.Cloud.LoadBalancers() + loadBalancerService := t.Cloud.LoadBalancersService() loadbalancer, _, err := loadBalancerService.Create(context.TODO(), &godo.LoadBalancerRequest{ Name: fi.StringValue(e.Name), Region: fi.StringValue(e.Region), @@ -175,8 +174,8 @@ func (lb *LoadBalancer) IsForAPIServer() bool { } func (lb *LoadBalancer) FindIPAddress(c *fi.Context) (*string, error) { - cloud := c.Cloud.(*digitalocean.Cloud) - loadBalancerService := cloud.LoadBalancers() + cloud := c.Cloud.(do.DOCloud) + loadBalancerService := cloud.LoadBalancersService() if len(fi.StringValue(lb.ID)) > 0 { // able to retrieve ID. diff --git a/upup/pkg/fi/cloudup/dotasks/volume.go b/upup/pkg/fi/cloudup/dotasks/volume.go index 68a49462adf19..028b027860948 100644 --- a/upup/pkg/fi/cloudup/dotasks/volume.go +++ b/upup/pkg/fi/cloudup/dotasks/volume.go @@ -23,7 +23,6 @@ import ( "github.com/digitalocean/godo" "k8s.io/klog/v2" - "k8s.io/kops/pkg/resources/digitalocean" "k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi/cloudup/do" "k8s.io/kops/upup/pkg/fi/cloudup/terraform" @@ -47,8 +46,8 @@ func (v *Volume) CompareWithID() *string { } func (v *Volume) Find(c *fi.Context) (*Volume, error) { - cloud := c.Cloud.(*digitalocean.Cloud) - volService := cloud.Volumes() + cloud := c.Cloud.(do.DOCloud) + volService := cloud.VolumeService() volumes, _, err := volService.ListVolumes(context.TODO(), &godo.ListVolumeParams{ Region: cloud.Region(), @@ -119,7 +118,7 @@ func (_ *Volume) RenderDO(t *do.DOAPITarget, a, e, changes *Volume) error { tagArray = append(tagArray, fmt.Sprintf("%s:%s", k, v)) } - volService := t.Cloud.Volumes() + volService := t.Cloud.VolumeService() _, _, err := volService.CreateVolume(context.TODO(), &godo.VolumeCreateRequest{ Name: fi.StringValue(e.Name), Region: fi.StringValue(e.Region), diff --git a/upup/pkg/fi/cloudup/dotasks/volume_test.go b/upup/pkg/fi/cloudup/dotasks/volume_test.go index e4f53ea882bb8..28d98e9874b27 100644 --- a/upup/pkg/fi/cloudup/dotasks/volume_test.go +++ b/upup/pkg/fi/cloudup/dotasks/volume_test.go @@ -23,8 +23,8 @@ import ( "testing" "github.com/digitalocean/godo" - "k8s.io/kops/pkg/resources/digitalocean" "k8s.io/kops/upup/pkg/fi" + "k8s.io/kops/upup/pkg/fi/cloudup/do" ) type fakeStorageClient struct { @@ -70,13 +70,6 @@ func (f fakeStorageClient) DeleteSnapshot(ctx context.Context, id string) (*godo return f.deleteSnapshotFn(ctx, id) } -func newCloud(client *godo.Client) *digitalocean.Cloud { - return &digitalocean.Cloud{ - Client: client, - RegionName: "nyc1", - } -} - func newContext(cloud fi.Cloud) *fi.Context { return &fi.Context{ Cloud: cloud, @@ -155,7 +148,7 @@ func Test_Find(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - cloud := newCloud(godo.NewClient(nil)) + cloud := do.BuildMockDOCloud("nyc1") cloud.Client.Storage = tc.storage ctx := newContext(cloud)