Skip to content

Commit

Permalink
Watches instance state changes and reconcile AWSMachine if necessary
Browse files Browse the repository at this point in the history
- Reconciles an SQS queue per cluster
- Reconciles rule per cluster on default event bus
- Custom event buses can't accept aws service events from the same account
- Adds queue as a target to the cluster's rule
- Adds instances to rule as instances are created
- Creates a policy for rule on the queue so rule can publish to the SQS queue
- Adds instance state controller to track queue URLs of all available clusters
- global scope that isn't tied to a cluster
- triggers reconcile on AWSMachine when consuming messages
- Increase cluster wait time for e2e
- Removes instance from tracked instances in event bridge rule during deletion of a machine
  • Loading branch information
Gab Satchi committed Feb 1, 2021
1 parent 1b76801 commit 6504b52
Show file tree
Hide file tree
Showing 49 changed files with 5,896 additions and 48 deletions.
1 change: 1 addition & 0 deletions api/v1alpha2/awsmachine_conversion.go
Expand Up @@ -50,6 +50,7 @@ func (src *AWSMachine) ConvertTo(dstRaw conversion.Hub) error {
func restoreAWSMachineSpec(restored, dst *infrav1alpha3.AWSMachineSpec, src *AWSMachineSpec) {
dst.ImageLookupFormat = restored.ImageLookupFormat
dst.ImageLookupBaseOS = restored.ImageLookupBaseOS
dst.InstanceID = restored.InstanceID

// Note this may override the manual conversion in Convert_v1alpha2_AWSMachineSpec_To_v1alpha3_AWSMachineSpec.
if restored.RootVolume != nil {
Expand Down
1 change: 1 addition & 0 deletions api/v1alpha2/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/v1alpha3/awsmachine_types.go
Expand Up @@ -44,6 +44,9 @@ type AWSMachineSpec struct {
// ProviderID is the unique identifier as specified by the cloud provider.
ProviderID *string `json:"providerID,omitempty"`

// InstanceID is the EC2 instance ID for this machine.
InstanceID *string `json:"instanceID,omitempty"`

// AMI is the reference to the AMI from which to create the machine instance.
AMI AWSResourceReference `json:"ami,omitempty"`

Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha3/awsmachine_webhook.go
Expand Up @@ -84,6 +84,10 @@ func (r *AWSMachine) ValidateUpdate(old runtime.Object) error {
delete(oldAWSMachineSpec, "providerID")
delete(newAWSMachineSpec, "providerID")

// allow changes to instanceID
delete(oldAWSMachineSpec, "instanceID")
delete(newAWSMachineSpec, "instanceID")

// allow changes to additionalTags
delete(oldAWSMachineSpec, "additionalTags")
delete(newAWSMachineSpec, "additionalTags")
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha3/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions cmd/clusterawsadm/api/bootstrap/v1alpha1/defaults.go
Expand Up @@ -58,6 +58,11 @@ func SetDefaults_AWSIAMConfigurationSpec(obj *AWSIAMConfigurationSpec) { //nolin
} else if obj.EKS.Enable {
obj.Nodes.EC2ContainerRegistryReadOnly = true
}
if obj.EventBridge == nil {
obj.EventBridge = &EventBridgeConfig{
Enable: false,
}
}
if obj.EKS.ManagedMachinePool == nil {
obj.EKS.ManagedMachinePool = &AWSIAMRoleSpec{
Disable: true,
Expand Down
10 changes: 10 additions & 0 deletions cmd/clusterawsadm/api/bootstrap/v1alpha1/types.go
Expand Up @@ -107,6 +107,13 @@ type EKSConfig struct {
ManagedMachinePool *AWSIAMRoleSpec `json:"managedMachinePool,omitempty"`
}

// EventBridgeConfig represents configuration for enabling experimental feature to consume
// EventBridge EC2 events
type EventBridgeConfig struct {
// Enable controls whether permissions are granted to consume EC2 events
Enable bool `json:"enable,omitempty"`
}

// ClusterAPIControllers controls the configuration of the AWS IAM role for
// the Kubernetes Cluster API Provider AWS controller.
type ClusterAPIControllers struct {
Expand Down Expand Up @@ -174,6 +181,9 @@ type AWSIAMConfigurationSpec struct {
// and nodes roles
EKS *EKSConfig `json:"eks,omitempty"`

// EventBridge controls configuration for consuming EventBridge events
EventBridge *EventBridgeConfig `json:"eventBridge,omitempty"`

// SecureSecretsBackend, when set to parameter-store will create AWS Systems Manager
// Parameter Storage policies. By default or with the value of secrets-manager,
// will generate AWS Secrets Manager policies instead.
Expand Down
20 changes: 20 additions & 0 deletions cmd/clusterawsadm/api/bootstrap/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/clusterawsadm/api/iam/v1alpha1/types.go
Expand Up @@ -66,7 +66,7 @@ const (
type PolicyDocument struct {
Version string
Statement Statements
ID string `json:"id,omitempty"`
ID string `json:"Id,omitempty"`
}

// StatementEntry represents each "statement" block in an AWS IAM policy document.
Expand Down
Expand Up @@ -347,6 +347,28 @@ func (t Template) controllersPolicy() *iamv1.PolicyDocument {

}

if t.Spec.EventBridge.Enable {
statement = append(statement, iamv1.StatementEntry{
Effect: iamv1.EffectAllow,
Resource: iamv1.Resources{iamv1.Any},
Action: iamv1.Actions{
"events:DeleteRule",
"events:DescribeRule",
"events:ListTargetsByRule",
"events:PutRule",
"events:PutTargets",
"events:RemoveTargets",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:DeleteQueue",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ReceiveMessage",
"sqs:SetQueueAttributes",
},
})
}

return &iamv1.PolicyDocument{
Version: iamv1.CurrentVersion,
Statement: statement,
Expand Down
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/awslabs/goformation/v4/cloudformation"
"github.com/sergi/go-diff/diffmatchpatch"
"k8s.io/utils/pointer"
iamv1 "sigs.k8s.io/cluster-api-provider-aws/cmd/clusterawsadm/api/iam/v1alpha1"
infrav1 "sigs.k8s.io/cluster-api-provider-aws/api/v1alpha3"
iamv1 "sigs.k8s.io/cluster-api-provider-aws/cmd/clusterawsadm/api/iam/v1alpha1"
"sigs.k8s.io/yaml"
)

Expand Down
Expand Up @@ -345,6 +345,9 @@ spec:
imageLookupOrg:
description: ImageLookupOrg is the AWS Organization ID to use for image lookup if AMI is not set.
type: string
instanceID:
description: InstanceID is the EC2 instance ID for this machine.
type: string
instanceType:
description: 'InstanceType is the type of instance to create. Example: m4.xlarge'
type: string
Expand Down
Expand Up @@ -308,6 +308,9 @@ spec:
imageLookupOrg:
description: ImageLookupOrg is the AWS Organization ID to use for image lookup if AMI is not set.
type: string
instanceID:
description: InstanceID is the EC2 instance ID for this machine.
type: string
instanceType:
description: 'InstanceType is the type of instance to create. Example: m4.xlarge'
type: string
Expand Down
2 changes: 1 addition & 1 deletion config/manager/manager.yaml
Expand Up @@ -19,7 +19,7 @@ spec:
containers:
- args:
- --enable-leader-election
- "--feature-gates=EKS=${EXP_EKS:=false},EKSEnableIAM=${EXP_EKS_IAM:=false},MachinePool=${EXP_MACHINE_POOL:=false}"
- "--feature-gates=EKS=${EXP_EKS:=false},EKSEnableIAM=${EXP_EKS_IAM:=false},MachinePool=${EXP_MACHINE_POOL:=false},EventBridgeInstanceState=${EVENT_BRIDGE_INSTANCE_STATE:=false}"
image: controller:latest
imagePullPolicy: Always
name: manager
Expand Down
2 changes: 1 addition & 1 deletion config/manager/manager_auth_proxy_patch.yaml
Expand Up @@ -23,4 +23,4 @@ spec:
args:
- "--metrics-addr=127.0.0.1:8080"
- "--enable-leader-election"
- "--feature-gates=EKS=${EXP_EKS:=false},MachinePool=${EXP_MACHINE_POOL:=false}"
- "--feature-gates=EKS=${EXP_EKS:=false},MachinePool=${EXP_MACHINE_POOL:=false},EventBridgeInstanceState=${EVENT_BRIDGE_INSTANCE_STATE:=false}"
18 changes: 17 additions & 1 deletion controllers/awscluster_controller.go
Expand Up @@ -42,9 +42,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

infrav1 "sigs.k8s.io/cluster-api-provider-aws/api/v1alpha3"
"sigs.k8s.io/cluster-api-provider-aws/feature"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/scope"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/ec2"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/elb"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/instancestate"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/network"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/securitygroup"
)
Expand Down Expand Up @@ -133,6 +135,13 @@ func reconcileDelete(clusterScope *scope.ClusterScope) (reconcile.Result, error)

awsCluster := clusterScope.AWSCluster

if feature.Gates.Enabled(feature.EventBridgeInstanceState) {
instancestateSvc := instancestate.NewService(clusterScope)
if err := instancestateSvc.DeleteEC2Events(); err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to delete EventBridge notifications for AWSCluster %s/%s", awsCluster.Namespace, awsCluster.Name)
}
}

if err := elbsvc.DeleteLoadbalancers(); err != nil {
return reconcile.Result{}, errors.Wrapf(err, "error deleting load balancer for AWSCluster %s/%s", awsCluster.Namespace, awsCluster.Name)
}
Expand Down Expand Up @@ -192,6 +201,13 @@ func reconcileNormal(clusterScope *scope.ClusterScope) (reconcile.Result, error)
return reconcile.Result{}, errors.Wrapf(err, "failed to reconcile bastion host for AWSCluster %s/%s", awsCluster.Namespace, awsCluster.Name)
}

if feature.Gates.Enabled(feature.EventBridgeInstanceState) {
instancestateSvc := instancestate.NewService(clusterScope)
if err := instancestateSvc.ReconcileEC2Events(); err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to reconcile EventBridge notifications for AWSCluster %s/%s", awsCluster.Namespace, awsCluster.Name)
}
}

if err := elbService.ReconcileLoadbalancers(); err != nil {
conditions.MarkFalse(awsCluster, infrav1.LoadBalancerReadyCondition, infrav1.LoadBalancerFailedReason, clusterv1.ConditionSeverityError, err.Error())
return reconcile.Result{}, errors.Wrapf(err, "failed to reconcile load balancers for AWSCluster %s/%s", awsCluster.Namespace, awsCluster.Name)
Expand Down Expand Up @@ -237,7 +253,7 @@ func (r *AWSClusterReconciler) SetupWithManager(mgr ctrl.Manager, options contro
controller, err := ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
For(&infrav1.AWSCluster{}).
WithEventFilter(pausedPredicates(r.Log)).
WithEventFilter(PausedPredicates(r.Log)).
WithEventFilter(
predicate.Funcs{
// Avoid reconciling if the event triggering the reconciliation is related to incremental status updates
Expand Down
44 changes: 42 additions & 2 deletions controllers/awsmachine_controller.go
Expand Up @@ -18,12 +18,14 @@ package controllers

import (
"context"
"fmt"
"reflect"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
Expand All @@ -42,16 +44,20 @@ import (

infrav1 "sigs.k8s.io/cluster-api-provider-aws/api/v1alpha3"
ekscontrolplanev1 "sigs.k8s.io/cluster-api-provider-aws/controlplane/eks/api/v1alpha3"
"sigs.k8s.io/cluster-api-provider-aws/feature"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/scope"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/ec2"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/elb"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/instancestate"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/secretsmanager"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/ssm"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/userdata"
)

const InstanceIDIndex = ".spec.instanceID"

// AWSMachineReconciler reconciles a AwsMachine object
type AWSMachineReconciler struct {
client.Client
Expand Down Expand Up @@ -208,7 +214,7 @@ func (r *AWSMachineReconciler) SetupWithManager(mgr ctrl.Manager, options contro
&source.Kind{Type: &infrav1.AWSCluster{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.AWSClusterToAWSMachines)},
).
WithEventFilter(pausedPredicates(r.Log)).
WithEventFilter(PausedPredicates(r.Log)).
WithEventFilter(
predicate.Funcs{
// Avoid reconciling if the event triggering the reconciliation is related to incremental status updates
Expand Down Expand Up @@ -236,6 +242,14 @@ func (r *AWSMachineReconciler) SetupWithManager(mgr ctrl.Manager, options contro
return err
}

// Add index to AWSMachine to find by providerID
if err := mgr.GetFieldIndexer().IndexField(&infrav1.AWSMachine{},
InstanceIDIndex,
r.indexAWSMachineByInstanceID,
); err != nil {
return errors.Wrap(err, "error setting index fields")
}

return controller.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
&handler.EnqueueRequestsFromMapFunc{
Expand Down Expand Up @@ -339,6 +353,11 @@ func (r *AWSMachineReconciler) reconcileDelete(machineScope *scope.MachineScope,
conditions.MarkFalse(machineScope.AWSMachine, infrav1.ELBAttachedCondition, clusterv1.DeletedReason, clusterv1.ConditionSeverityInfo, "")
}

if feature.Gates.Enabled(feature.EventBridgeInstanceState) {
instancestateSvc := instancestate.NewService(ec2Scope)
instancestateSvc.RemoveInstanceFromEventPattern(instance.ID)
}

// Check the instance state. If it's already shutting down or terminated,
// do nothing. Otherwise attempt to delete it.
// This decision is based on the ec2-instance-lifecycle graph at
Expand Down Expand Up @@ -490,9 +509,16 @@ func (r *AWSMachineReconciler) reconcileNormal(_ context.Context, machineScope *
return ctrl.Result{}, err
}
}
if feature.Gates.Enabled(feature.EventBridgeInstanceState) {
instancestateSvc := instancestate.NewService(ec2Scope)
if err := instancestateSvc.AddInstanceToEventPattern(instance.ID); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to add instance to Event Bridge instance state rule")
}
}

// Make sure Spec.ProviderID is always set.
// Make sure Spec.ProviderID and Spec.InstanceID are always set.
machineScope.SetProviderID(instance.ID, instance.AvailabilityZone)
machineScope.SetInstanceID(instance.ID)

// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html

Expand Down Expand Up @@ -851,3 +877,17 @@ func (r *AWSMachineReconciler) getInfraCluster(ctx context.Context, log logr.Log

return clusterScope, nil
}

func (r *AWSMachineReconciler) indexAWSMachineByInstanceID(o runtime.Object) []string {
awsMachine, ok := o.(*infrav1.AWSMachine)
if !ok {
r.Log.Error(errors.New("incorrect type"), "expected an AWSMachine", "type", fmt.Sprintf("%T", o))
return nil
}

if awsMachine.Spec.InstanceID != nil {
return []string{*awsMachine.Spec.InstanceID}
}

return nil
}
2 changes: 1 addition & 1 deletion controllers/awsmachine_controller_test.go
Expand Up @@ -161,7 +161,7 @@ var _ = Describe("AWSMachineReconciler", func() {
return secretSvc
},
Recorder: recorder,
Log: klogr.New(),
Log: klogr.New(),
}
})
AfterEach(func() {
Expand Down
2 changes: 1 addition & 1 deletion controllers/helpers.go
Expand Up @@ -27,7 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

func pausedPredicates(logger logr.Logger) predicate.Funcs {
func PausedPredicates(logger logr.Logger) predicate.Funcs {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
return processIfUnpaused(logger.WithValues("predicate", "updateEvent"), e.ObjectNew, e.MetaNew)
Expand Down

0 comments on commit 6504b52

Please sign in to comment.