Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OCPCLOUD-802] Add Spot terminationion notice handler #308

Merged
merged 9 commits into from
Mar 26, 2020
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM registry.svc.ci.openshift.org/openshift/release:golang-1.12 AS builder
FROM registry.svc.ci.openshift.org/openshift/release:golang-1.13 AS builder
WORKDIR /go/src/sigs.k8s.io/cluster-api-provider-aws
COPY . .
# VERSION env gets set in the openshift/release image and refers to the golang version, which interfers with our own
Expand All @@ -14,3 +14,4 @@ RUN INSTALL_PKGS=" \
yum clean all
COPY --from=builder /go/src/sigs.k8s.io/cluster-api-provider-aws/bin/manager /
COPY --from=builder /go/src/sigs.k8s.io/cluster-api-provider-aws/bin/machine-controller-manager /
COPY --from=builder /go/src/sigs.k8s.io/cluster-api-provider-aws/bin/termination-handler /
2 changes: 1 addition & 1 deletion Dockerfile.rhel7
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM registry.svc.ci.openshift.org/ocp/builder:golang-1.12 AS builder
FROM registry.svc.ci.openshift.org/ocp/builder:golang-1.13 AS builder
WORKDIR /go/src/sigs.k8s.io/cluster-api-provider-aws
COPY . .
# VERSION env gets set in the openshift/release image and refers to the golang version, which interfers with our own
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ ifeq ($(NO_DOCKER), 1)
IMAGE_BUILD_CMD = imagebuilder
CGO_ENABLED = 1
else
DOCKER_CMD := docker run --rm -e CGO_ENABLED=1 -v "$(PWD)":/go/src/sigs.k8s.io/cluster-api-provider-aws:Z -w /go/src/sigs.k8s.io/cluster-api-provider-aws openshift/origin-release:golang-1.12
DOCKER_CMD := docker run --rm -e CGO_ENABLED=1 -v "$(PWD)":/go/src/sigs.k8s.io/cluster-api-provider-aws:Z -w /go/src/sigs.k8s.io/cluster-api-provider-aws openshift/origin-release:golang-1.13
IMAGE_BUILD_CMD = docker build
endif

Expand Down Expand Up @@ -69,6 +69,8 @@ build: ## build binaries
-ldflags "$(LD_FLAGS)" "$(REPO_PATH)/cmd/manager"
$(DOCKER_CMD) go build $(GOGCFLAGS) -o bin/manager -ldflags '-extldflags "-static"' \
"$(REPO_PATH)/vendor/github.com/openshift/machine-api-operator/cmd/machineset"
$(DOCKER_CMD) go build $(GOGCFLAGS) -o "bin/termination-handler" \
-ldflags "$(LD_FLAGS)" "$(REPO_PATH)/cmd/termination-handler"

aws-actuator:
$(DOCKER_CMD) go build $(GOGCFLAGS) -o bin/aws-actuator sigs.k8s.io/cluster-api-provider-aws/cmd/aws-actuator
Expand Down
73 changes: 73 additions & 0 deletions cmd/termination-handler/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"flag"
"fmt"
"os"
"time"

"k8s.io/klog"
"k8s.io/klog/klogr"
"sigs.k8s.io/cluster-api-provider-aws/pkg/termination"
"sigs.k8s.io/cluster-api-provider-aws/pkg/version"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

func main() {
var printVersion bool
flag.BoolVar(&printVersion, "version", false, "print version and exit")

klog.InitFlags(nil)
logger := klogr.New()

pollIntervalSeconds := flag.Int64("poll-interval-seconds", 5, "interval in seconds at which termination notice endpoint should be checked (Default: 5)")
nodeName := flag.String("node-name", "", "name of the node that the termination handler is running on")
JoelSpeed marked this conversation as resolved.
Show resolved Hide resolved
namespace := flag.String("namespace", "", "namespace that the machine for the node should live in. If unspecified, the look for machines across all namespaces.")
flag.Set("logtostderr", "true")
flag.Parse()

if printVersion {
fmt.Println(version.String)
os.Exit(0)
}

// Get a config to talk to the apiserver
cfg, err := config.GetConfig()
if err != nil {
logger.Error(err, "Error getting configuration")
return
}

// Get the poll interval as a duration from the `poll-interval-seconds` flag
pollInterval := time.Duration(*pollIntervalSeconds) * time.Second

// Construct a termination handler
handler, err := termination.NewHandler(logger, cfg, pollInterval, *namespace, *nodeName)
if err != nil {
logger.Error(err, "Error constructing termination handler")
return
}

// Start the termination handler
if err := handler.Run(ctrl.SetupSignalHandler()); err != nil {
logger.Error(err, "Error starting termination handler")
return
}
}
145 changes: 145 additions & 0 deletions pkg/termination/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package termination

import (
"context"
"fmt"
"net/http"
"net/url"
"sync"
"time"

"github.com/go-logr/logr"
machinev1 "github.com/openshift/machine-api-operator/pkg/apis/machine/v1beta1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
awsTerminationEndpointURL = "http://169.254.169.254/latest/meta-data/spot/termination-time"
)

// Handler represents a handler that will run to check the termination
// notice endpoint and delete Machine's if the instance termination notice is fulfilled.
type Handler interface {
Run(stop <-chan struct{}) error
}

// NewHandler constructs a new Handler
func NewHandler(logger logr.Logger, cfg *rest.Config, pollInterval time.Duration, namespace, nodeName string) (Handler, error) {
machinev1.AddToScheme(scheme.Scheme)
c, err := client.New(cfg, client.Options{Scheme: scheme.Scheme})
if err != nil {
return nil, fmt.Errorf("error creating client: %v", err)
}

pollURL, err := url.Parse(awsTerminationEndpointURL)
if err != nil {
// This should never happen
panic(err)
}

logger = logger.WithValues("node", nodeName, "namespace", namespace)

return &handler{
client: c,
pollURL: pollURL,
pollInterval: pollInterval,
nodeName: nodeName,
namespace: namespace,
log: logger,
}, nil
}

// handler implements the logic to check the termination endpoint and delete the
// machine associated with the node
type handler struct {
client client.Client
pollURL *url.URL
pollInterval time.Duration
nodeName string
namespace string
log logr.Logger
}

// Run starts the handler and runs the termination logic
func (h *handler) Run(stop <-chan struct{}) error {
ctx, cancel := context.WithCancel(context.Background())

errs := make(chan error, 1)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
errs <- h.run(ctx, wg)
}()

select {
case <-stop:
cancel()
// Wait for run to stop
wg.Wait()
return nil
case err := <-errs:
cancel()
return err
}
}

func (h *handler) run(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()

machine, err := h.getMachineForNode(ctx)
if err != nil {
return fmt.Errorf("error fetching machine for node (%q): %v", h.nodeName, err)
}

logger := h.log.WithValues("machine", machine.Name)
logger.V(1).Info("Monitoring node for machine")

if err := wait.PollImmediateUntil(h.pollInterval, func() (bool, error) {
resp, err := http.Get(h.pollURL.String())
if err != nil {
return false, fmt.Errorf("could not get URL %q: %v", h.pollURL.String(), err)
}
switch resp.StatusCode {
case http.StatusNotFound:
// Instance not terminated yet
logger.V(2).Info("Instance not marked for termination")
return false, nil
case http.StatusOK:
// Instance marked for termination
return true, nil
default:
// Unknown case, return an error
return false, fmt.Errorf("unexpected status: %d", resp.StatusCode)
}
}, ctx.Done()); err != nil {
return fmt.Errorf("error polling termination endpoint: %v", err)
}

// Will only get here if the termination endpoint returned 200
logger.V(1).Info("Instance marked for termination, deleting Machine")
if err := h.client.Delete(ctx, machine); err != nil {
return fmt.Errorf("error deleting machine: %v", err)
}

return nil
}

// getMachineForNodeName finds the Machine associated with the Node name given
func (h *handler) getMachineForNode(ctx context.Context) (*machinev1.Machine, error) {
machineList := &machinev1.MachineList{}
err := h.client.List(ctx, machineList, client.InNamespace(h.namespace))
if err != nil {
return nil, fmt.Errorf("error listing machines: %v", err)
}

for _, machine := range machineList.Items {
if machine.Status.NodeRef != nil && machine.Status.NodeRef.Name == h.nodeName {
return &machine, nil
}
}

return nil, fmt.Errorf("machine not found for node %q", h.nodeName)
}