Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context to all relevant cloud APIs #59287

Merged
merged 1 commit into from Feb 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/kubelet/app/server.go
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package app

import (
"context"
"crypto/tls"
"errors"
"fmt"
Expand Down Expand Up @@ -712,7 +713,7 @@ func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName
return "", fmt.Errorf("failed to get instances from cloud provider")
}

nodeName, err := instances.CurrentNodeName(hostname)
nodeName, err := instances.CurrentNodeName(context.TODO(), hostname)
if err != nil {
return "", fmt.Errorf("error fetching current node name from cloud provider: %v", err)
}
Expand Down
49 changes: 25 additions & 24 deletions pkg/cloudprovider/cloud.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cloudprovider

import (
"context"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -56,9 +57,9 @@ type InformerUser interface {
// Clusters is an abstract, pluggable interface for clusters of containers.
type Clusters interface {
// ListClusters lists the names of the available clusters.
ListClusters() ([]string, error)
ListClusters(ctx context.Context) ([]string, error)
// Master gets back the address (either DNS name or IP address) of the master node for the cluster.
Master(clusterName string) (string, error)
Master(ctx context.Context, clusterName string) (string, error)
}

// TODO(#6812): Use a shorter name that's less likely to be longer than cloud
Expand All @@ -75,12 +76,12 @@ func GetLoadBalancerName(service *v1.Service) string {
}

// GetInstanceProviderID builds a ProviderID for a node in a cloud.
func GetInstanceProviderID(cloud Interface, nodeName types.NodeName) (string, error) {
func GetInstanceProviderID(ctx context.Context, cloud Interface, nodeName types.NodeName) (string, error) {
instances, ok := cloud.Instances()
if !ok {
return "", fmt.Errorf("failed to get instances from cloud provider")
}
instanceID, err := instances.InstanceID(nodeName)
instanceID, err := instances.InstanceID(ctx, nodeName)
if err != nil {
return "", fmt.Errorf("failed to get instance ID from cloud provider: %v", err)
}
Expand All @@ -94,17 +95,17 @@ type LoadBalancer interface {
// if so, what its status is.
// Implementations must treat the *v1.Service parameter as read-only and not modify it.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
GetLoadBalancer(clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error)
GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error)
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
// Implementations must treat the *v1.Service and *v1.Node
// parameters as read-only and not modify them.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error)
EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error)
// UpdateLoadBalancer updates hosts under the specified load balancer.
// Implementations must treat the *v1.Service and *v1.Node
// parameters as read-only and not modify them.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error
UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
// exists, returning nil if the load balancer specified either didn't exist or
// was successfully deleted.
Expand All @@ -113,7 +114,7 @@ type LoadBalancer interface {
// doesn't exist even if some part of it is still laying around.
// Implementations must treat the *v1.Service parameter as read-only and not modify it.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error
EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error
}

// Instances is an abstract, pluggable interface for sets of instances.
Expand All @@ -122,31 +123,31 @@ type Instances interface {
// TODO(roberthbailey): This currently is only used in such a way that it
// returns the address of the calling instance. We should do a rename to
// make this clearer.
NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error)
NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error)
// NodeAddressesByProviderID returns the addresses of the specified instance.
// The instance is specified using the providerID of the node. The
// ProviderID is a unique identifier of the node. This will not be called
// from the node whose nodeaddresses are being queried. i.e. local metadata
// services cannot be used in this method to obtain nodeaddresses
NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error)
NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error)
// ExternalID returns the cloud provider ID of the node with the specified NodeName.
// Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound)
ExternalID(nodeName types.NodeName) (string, error)
ExternalID(ctx context.Context, nodeName types.NodeName) (string, error)
// InstanceID returns the cloud provider ID of the node with the specified NodeName.
InstanceID(nodeName types.NodeName) (string, error)
InstanceID(ctx context.Context, nodeName types.NodeName) (string, error)
// InstanceType returns the type of the specified instance.
InstanceType(name types.NodeName) (string, error)
InstanceType(ctx context.Context, name types.NodeName) (string, error)
// InstanceTypeByProviderID returns the type of the specified instance.
InstanceTypeByProviderID(providerID string) (string, error)
InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error)
// AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances
// expected format for the key is standard ssh-keygen format: <protocol> <blob>
AddSSHKeyToAllInstances(user string, keyData []byte) error
AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error
// CurrentNodeName returns the name of the node we are currently running on
// On most clouds (e.g. GCE) this is the hostname, so we provide the hostname
CurrentNodeName(hostname string) (types.NodeName, error)
CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error)
// InstanceExistsByProviderID returns true if the instance for the given provider id still is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
InstanceExistsByProviderID(providerID string) (bool, error)
InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error)
}

// Route is a representation of an advanced routing rule.
Expand All @@ -167,14 +168,14 @@ type Route struct {
// Routes is an abstract, pluggable interface for advanced routing rules.
type Routes interface {
// ListRoutes lists all managed routes that belong to the specified clusterName
ListRoutes(clusterName string) ([]*Route, error)
ListRoutes(ctx context.Context, clusterName string) ([]*Route, error)
// CreateRoute creates the described managed route
// route.Name will be ignored, although the cloud-provider may use nameHint
// to create a more user-meaningful name.
CreateRoute(clusterName string, nameHint string, route *Route) error
CreateRoute(ctx context.Context, clusterName string, nameHint string, route *Route) error
// DeleteRoute deletes the specified managed route
// Route should be as returned by ListRoutes
DeleteRoute(clusterName string, route *Route) error
DeleteRoute(ctx context.Context, clusterName string, route *Route) error
}

var (
Expand All @@ -195,20 +196,20 @@ type Zones interface {
// In most cases, this method is called from the kubelet querying a local metadata service to aquire its zone.
// For the case of external cloud providers, use GetZoneByProviderID or GetZoneByNodeName since GetZone
// can no longer be called from the kubelets.
GetZone() (Zone, error)
GetZone(ctx context.Context) (Zone, error)

// GetZoneByProviderID returns the Zone containing the current zone and locality region of the node specified by providerId
// This method is particularly used in the context of external cloud providers where node initialization must be down
// outside the kubelets.
GetZoneByProviderID(providerID string) (Zone, error)
GetZoneByProviderID(ctx context.Context, providerID string) (Zone, error)

// GetZoneByNodeName returns the Zone containing the current zone and locality region of the node specified by node name
// This method is particularly used in the context of external cloud providers where node initialization must be down
// outside the kubelets.
GetZoneByNodeName(nodeName types.NodeName) (Zone, error)
GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) (Zone, error)
}

// PVLabeler is an abstract, pluggable interface for fetching labels for volumes
type PVLabeler interface {
GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error)
GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error)
}
37 changes: 19 additions & 18 deletions pkg/cloudprovider/providers/aws/aws.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package aws

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -731,12 +732,12 @@ func newEc2Filter(name string, values ...string) *ec2.Filter {
}

// AddSSHKeyToAllInstances is currently not implemented.
func (c *Cloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (c *Cloud) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return cloudprovider.NotImplemented
}

// CurrentNodeName returns the name of the current node
func (c *Cloud) CurrentNodeName(hostname string) (types.NodeName, error) {
func (c *Cloud) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
return c.selfAWSInstance.nodeName, nil
}

Expand Down Expand Up @@ -1156,7 +1157,7 @@ func (c *Cloud) HasClusterID() bool {
}

// NodeAddresses is an implementation of Instances.NodeAddresses.
func (c *Cloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) {
func (c *Cloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error) {
if c.selfAWSInstance.nodeName == name || len(name) == 0 {
addresses := []v1.NodeAddress{}

Expand Down Expand Up @@ -1273,7 +1274,7 @@ func extractNodeAddresses(instance *ec2.Instance) ([]v1.NodeAddress, error) {
// NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (c *Cloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
func (c *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
instanceID, err := kubernetesInstanceID(providerID).mapToAWSInstanceID()
if err != nil {
return nil, err
Expand All @@ -1288,7 +1289,7 @@ func (c *Cloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress,
}

// ExternalID returns the cloud provider ID of the node with the specified nodeName (deprecated).
func (c *Cloud) ExternalID(nodeName types.NodeName) (string, error) {
func (c *Cloud) ExternalID(ctx context.Context, nodeName types.NodeName) (string, error) {
if c.selfAWSInstance.nodeName == nodeName {
// We assume that if this is run on the instance itself, the instance exists and is alive
return c.selfAWSInstance.awsID, nil
Expand All @@ -1307,7 +1308,7 @@ func (c *Cloud) ExternalID(nodeName types.NodeName) (string, error) {

// InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
func (c *Cloud) InstanceExistsByProviderID(providerID string) (bool, error) {
func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
instanceID, err := kubernetesInstanceID(providerID).mapToAWSInstanceID()
if err != nil {
return false, err
Expand Down Expand Up @@ -1338,7 +1339,7 @@ func (c *Cloud) InstanceExistsByProviderID(providerID string) (bool, error) {
}

// InstanceID returns the cloud provider ID of the node with the specified nodeName.
func (c *Cloud) InstanceID(nodeName types.NodeName) (string, error) {
func (c *Cloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) {
// In the future it is possible to also return an endpoint as:
// <endpoint>/<zone>/<instanceid>
if c.selfAWSInstance.nodeName == nodeName {
Expand All @@ -1354,7 +1355,7 @@ func (c *Cloud) InstanceID(nodeName types.NodeName) (string, error) {
// InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (c *Cloud) InstanceTypeByProviderID(providerID string) (string, error) {
func (c *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
instanceID, err := kubernetesInstanceID(providerID).mapToAWSInstanceID()
if err != nil {
return "", err
Expand All @@ -1369,7 +1370,7 @@ func (c *Cloud) InstanceTypeByProviderID(providerID string) (string, error) {
}

// InstanceType returns the type of the node with the specified nodeName.
func (c *Cloud) InstanceType(nodeName types.NodeName) (string, error) {
func (c *Cloud) InstanceType(ctx context.Context, nodeName types.NodeName) (string, error) {
if c.selfAWSInstance.nodeName == nodeName {
return c.selfAWSInstance.instanceType, nil
}
Expand Down Expand Up @@ -1430,7 +1431,7 @@ func (c *Cloud) getCandidateZonesForDynamicVolume() (sets.String, error) {
}

// GetZone implements Zones.GetZone
func (c *Cloud) GetZone() (cloudprovider.Zone, error) {
func (c *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
return cloudprovider.Zone{
FailureDomain: c.selfAWSInstance.availabilityZone,
Region: c.region,
Expand All @@ -1440,7 +1441,7 @@ func (c *Cloud) GetZone() (cloudprovider.Zone, error) {
// GetZoneByProviderID implements Zones.GetZoneByProviderID
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (c *Cloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, error) {
func (c *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
instanceID, err := kubernetesInstanceID(providerID).mapToAWSInstanceID()
if err != nil {
return cloudprovider.Zone{}, err
Expand All @@ -1461,7 +1462,7 @@ func (c *Cloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, erro
// GetZoneByNodeName implements Zones.GetZoneByNodeName
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (c *Cloud) GetZoneByNodeName(nodeName types.NodeName) (cloudprovider.Zone, error) {
func (c *Cloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) (cloudprovider.Zone, error) {
instance, err := c.getInstanceByNodeName(nodeName)
if err != nil {
return cloudprovider.Zone{}, err
Expand Down Expand Up @@ -2249,7 +2250,7 @@ func (c *Cloud) checkIfAvailable(disk *awsDisk, opName string, instance string)
return true, nil
}

func (c *Cloud) GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) {
func (c *Cloud) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if pv.Spec.AWSElasticBlockStore.VolumeID == volume.ProvisionedVolumeName {
return nil, nil
Expand Down Expand Up @@ -3162,7 +3163,7 @@ func buildListener(port v1.ServicePort, annotations map[string]string, sslPorts
}

// EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer
func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
func (c *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
annotations := apiService.Annotations
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)",
clusterName, apiService.Namespace, apiService.Name, c.region, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, annotations)
Expand Down Expand Up @@ -3533,7 +3534,7 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n
}

// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (c *Cloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
func (c *Cloud) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)

if isNLB(service.Annotations) {
Expand Down Expand Up @@ -3793,7 +3794,7 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer
}

// EnsureLoadBalancerDeleted implements LoadBalancer.EnsureLoadBalancerDeleted.
func (c *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error {
func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)

if isNLB(service.Annotations) {
Expand Down Expand Up @@ -4026,7 +4027,7 @@ func (c *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servic
}

// UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer
func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
func (c *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
instances, err := c.findInstancesForELB(nodes)
if err != nil {
return err
Expand All @@ -4041,7 +4042,7 @@ func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, node
if lb == nil {
return fmt.Errorf("Load balancer not found")
}
_, err = c.EnsureLoadBalancer(clusterName, service, nodes)
_, err = c.EnsureLoadBalancer(ctx, clusterName, service, nodes)
return err
}
lb, err := c.describeLoadBalancer(loadBalancerName)
Expand Down
7 changes: 4 additions & 3 deletions pkg/cloudprovider/providers/aws/aws_routes.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package aws

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -65,7 +66,7 @@ func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) {

// ListRoutes implements Routes.ListRoutes
// List all routes that match the filter
func (c *Cloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
func (c *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) {
table, err := c.findRouteTable(clusterName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -138,7 +139,7 @@ func (c *Cloud) configureInstanceSourceDestCheck(instanceID string, sourceDestCh

// CreateRoute implements Routes.CreateRoute
// Create the described route
func (c *Cloud) CreateRoute(clusterName string, nameHint string, route *cloudprovider.Route) error {
func (c *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint string, route *cloudprovider.Route) error {
instance, err := c.getInstanceByNodeName(route.TargetNode)
if err != nil {
return err
Expand Down Expand Up @@ -198,7 +199,7 @@ func (c *Cloud) CreateRoute(clusterName string, nameHint string, route *cloudpro

// DeleteRoute implements Routes.DeleteRoute
// Delete the specified route
func (c *Cloud) DeleteRoute(clusterName string, route *cloudprovider.Route) error {
func (c *Cloud) DeleteRoute(ctx context.Context, clusterName string, route *cloudprovider.Route) error {
table, err := c.findRouteTable(clusterName)
if err != nil {
return err
Expand Down