Skip to content

Commit

Permalink
Merge pull request #78140 from zhan849/aws-get-instance-by-id
Browse files Browse the repository at this point in the history
Cloud provider AWS library should query instance by ID when possible
  • Loading branch information
k8s-ci-robot committed Jul 12, 2019
2 parents aaad86c + 6ae7620 commit 416a717
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 4 deletions.
5 changes: 5 additions & 0 deletions staging/src/k8s.io/legacy-cloud-providers/aws/BUILD
Expand Up @@ -33,10 +33,13 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/pkg/version:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library",
Expand Down Expand Up @@ -82,6 +85,8 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library",
Expand Down
54 changes: 50 additions & 4 deletions staging/src/k8s.io/legacy-cloud-providers/aws/aws.go
Expand Up @@ -53,10 +53,13 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
informercorev1 "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/cloud-provider"
nodehelpers "k8s.io/cloud-provider/node/helpers"
Expand Down Expand Up @@ -509,8 +512,13 @@ type Cloud struct {

instanceCache instanceCache

clientBuilder cloudprovider.ControllerClientBuilder
kubeClient clientset.Interface
clientBuilder cloudprovider.ControllerClientBuilder
kubeClient clientset.Interface

nodeInformer informercorev1.NodeInformer
// Extract the function out to make it easier to test
nodeInformerHasSynced cache.InformerSynced

eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder

Expand Down Expand Up @@ -748,6 +756,14 @@ func (p *awsSDKProvider) getCrossRequestRetryDelay(regionName string) *CrossRequ
return delayer
}

// SetInformers implements InformerUser interface by setting up informer-fed caches for aws lib to
// leverage Kubernetes API for caching
func (c *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
klog.Infof("Setting up informers for Cloud")
c.nodeInformer = informerFactory.Core().V1().Nodes()
c.nodeInformerHasSynced = c.nodeInformer.Informer().HasSynced
}

func (p *awsSDKProvider) Compute(regionName string) (EC2, error) {
awsConfig := &aws.Config{
Region: &regionName,
Expand Down Expand Up @@ -1289,7 +1305,6 @@ func newAWSCloud(cfg CloudConfig, awsServices Services) (*Cloud, error) {
return nil, err
}
}

return awsCloud, nil
}

Expand Down Expand Up @@ -4520,7 +4535,18 @@ func (c *Cloud) findInstanceByNodeName(nodeName types.NodeName) (*ec2.Instance,
// Returns the instance with the specified node name
// Like findInstanceByNodeName, but returns error if node not found
func (c *Cloud) getInstanceByNodeName(nodeName types.NodeName) (*ec2.Instance, error) {
instance, err := c.findInstanceByNodeName(nodeName)
var instance *ec2.Instance

// we leverage node cache to try to retrieve node's provider id first, as
// get instance by provider id is way more efficient than by filters in
// aws context
awsID, err := c.nodeNameToProviderID(nodeName)
if err != nil {
klog.V(3).Infof("Unable to convert node name %q to aws instanceID, fall back to findInstanceByNodeName: %v", nodeName, err)
instance, err = c.findInstanceByNodeName(nodeName)
} else {
instance, err = c.getInstanceByID(string(awsID))
}
if err == nil && instance == nil {
return nil, cloudprovider.InstanceNotFound
}
Expand All @@ -4540,6 +4566,26 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins
return awsInstance, instance, err
}

func (c *Cloud) nodeNameToProviderID(nodeName types.NodeName) (InstanceID, error) {
if len(nodeName) == 0 {
return "", fmt.Errorf("no nodeName provided")
}

if c.nodeInformerHasSynced == nil || !c.nodeInformerHasSynced() {
return "", fmt.Errorf("node informer has not synced yet")
}

node, err := c.nodeInformer.Lister().Get(string(nodeName))
if err != nil {
return "", err
}
if len(node.Spec.ProviderID) == 0 {
return "", fmt.Errorf("node has no providerID")
}

return KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
}

func setNodeDisk(
nodeDiskMap map[types.NodeName]map[KubernetesVolumeID]bool,
volumeID KubernetesVolumeID,
Expand Down
51 changes: 51 additions & 0 deletions staging/src/k8s.io/legacy-cloud-providers/aws/aws_test.go
Expand Up @@ -36,6 +36,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
cloudvolume "k8s.io/cloud-provider/volume"
)

Expand Down Expand Up @@ -552,6 +554,7 @@ func mockInstancesResp(selfInstance *ec2.Instance, instances []*ec2.Instance) (*
if err != nil {
panic(err)
}
awsCloud.kubeClient = fake.NewSimpleClientset()
return awsCloud, awsServices
}

Expand All @@ -561,6 +564,7 @@ func mockAvailabilityZone(availabilityZone string) *Cloud {
if err != nil {
panic(err)
}
awsCloud.kubeClient = fake.NewSimpleClientset()
return awsCloud
}

Expand Down Expand Up @@ -1910,6 +1914,53 @@ func TestRegionIsValid(t *testing.T) {
assert.False(t, isRegionValid("pl-fake-991a", fake.metadata), "expected region 'pl-fake-991' to be invalid but it was not")
}

func TestNodeNameToProviderID(t *testing.T) {
testNodeName := types.NodeName("ip-10-0-0-1.ec2.internal")
testProviderID := "aws:///us-east-1c/i-02bce90670bb0c7cd"
fakeAWS := newMockedFakeAWSServices(TestClusterID)
c, err := newAWSCloud(CloudConfig{}, fakeAWS)
assert.NoError(t, err)

fakeClient := &fake.Clientset{}
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
c.SetInformers(fakeInformerFactory)

// no node name
_, err = c.nodeNameToProviderID("")
assert.Error(t, err)

// informer has not synced
c.nodeInformerHasSynced = informerNotSynced
_, err = c.nodeNameToProviderID(testNodeName)
assert.Error(t, err)

// informer has synced but node not found
c.nodeInformerHasSynced = informerSynced
_, err = c.nodeNameToProviderID(testNodeName)
assert.Error(t, err)

// we are able to find the node in cache
err = c.nodeInformer.Informer().GetStore().Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(testNodeName),
},
Spec: v1.NodeSpec{
ProviderID: testProviderID,
},
})
assert.NoError(t, err)
_, err = c.nodeNameToProviderID(testNodeName)
assert.NoError(t, err)
}

func informerSynced() bool {
return true
}

func informerNotSynced() bool {
return false
}

func newMockedFakeAWSServices(id string) *FakeAWSServices {
s := NewFakeAWSServices(id)
s.ec2 = &MockedFakeEC2{FakeEC2Impl: s.ec2.(*FakeEC2Impl)}
Expand Down

0 comments on commit 416a717

Please sign in to comment.