Skip to content

Commit

Permalink
Merge pull request #103 from JoelSpeed/termination-scheme
Browse files Browse the repository at this point in the history
BUG 1856597: Pass scheme to client creation so that it uses scheme with Machine API
  • Loading branch information
openshift-merge-robot committed Jul 16, 2020
2 parents 5c8a640 + e834e31 commit c71ec2f
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 94 deletions.
4 changes: 2 additions & 2 deletions pkg/termination/termination.go
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/go-logr/logr"
machinev1 "github.com/openshift/machine-api-operator/pkg/apis/machine/v1beta1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -30,7 +30,7 @@ type Handler interface {
// NewHandler constructs a new Handler
func NewHandler(logger logr.Logger, cfg *rest.Config, pollInterval time.Duration, namespace, nodeName string) (Handler, error) {
machinev1.AddToScheme(scheme.Scheme)
c, err := client.New(cfg, client.Options{})
c, err := client.New(cfg, client.Options{Scheme: scheme.Scheme})
if err != nil {
return nil, fmt.Errorf("error creating client: %v", err)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/termination/termination_suite_test.go
Expand Up @@ -24,7 +24,8 @@ import (
machinev1 "github.com/openshift/machine-api-operator/pkg/apis/machine/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/apimachinery/pkg/runtime"
kubernetesscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
Expand Down Expand Up @@ -56,14 +57,19 @@ var _ = BeforeSuite(func() {
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crds")},
}
machinev1.AddToScheme(scheme.Scheme)

// Use our own scheme so we don't interfere with any test cases
// that would initialise the scheme themselves.
scheme := runtime.NewScheme()
kubernetesscheme.AddToScheme(scheme)
machinev1.AddToScheme(scheme)

var err error
cfg, err = testEnv.Start()
Expect(err).ToNot(HaveOccurred())
Expect(cfg).ToNot(BeNil())

k8sClient, err = client.New(cfg, client.Options{})
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme})
Expect(err).ToNot(HaveOccurred())

Expect(k8sClient.Create(ctx, &corev1.Namespace{
Expand Down
187 changes: 98 additions & 89 deletions pkg/termination/termination_test.go
Expand Up @@ -50,13 +50,18 @@ var _ = Describe("Handler Suite", func() {
httpHandler = nil
nodeName = "test-node"
httpHandler = newMockHTTPHandler(notPreempted)
stop = nil
errs = nil

h = &handler{
client: k8sClient,
pollInterval: 100 * time.Millisecond,
nodeName: nodeName,
log: klogr.New(),
}
// use NewHandler() instead of manual construction in order to test NewHandler() logic
// like checking that machine api is added to scheme
handlerInterface, err := NewHandler(klogr.New(), cfg, 100*time.Millisecond, "", nodeName)
Expect(err).ToNot(HaveOccurred())

h = handlerInterface.(*handler)

// set pollURL so we can override initial value later
h.pollURL = nil
})

JustBeforeEach(func() {
Expand All @@ -68,124 +73,128 @@ var _ = Describe("Handler Suite", func() {
Expect(err).ToNot(HaveOccurred())
h.pollURL = pollURL
}

stop, errs = StartTestHandler(h)
})

AfterEach(func() {
if !isClosed(stop) {
if stop != nil && !isClosed(stop) {
close(stop)
}
terminationServer.Close()

Expect(deleteAllMachines(k8sClient)).To(Succeed())
})

Context("when the handler is stopped", func() {
JustBeforeEach(func() {
close(stop)
})

It("should not return an error", func() {
Eventually(errs).Should(Receive(BeNil()))
})
})

Context("when no machine exists for the node", func() {
It("should return an error upon starting", func() {
Eventually(errs).Should(Receive(MatchError("error fetching machine for node (\"test-node\"): machine not found for node \"test-node\"")))
})
})

Context("when a machine exists for the node", func() {
var counter int32
var testMachine *machinev1.Machine

BeforeEach(func() {
testMachine = newTestMachine("test-machine", testNamespace, nodeName)
createMachine(testMachine)

// Ensure the polling logic is excercised in tests
httpHandler = newMockHTTPHandler(func(rw http.ResponseWriter, req *http.Request) {
if atomic.LoadInt32(&counter) == 4 {
rw.Write([]byte("TRUE"))
} else {
atomic.AddInt32(&counter, 1)
rw.Write([]byte("FALSE"))
}
})
})

Context("when running the handler", func() {
JustBeforeEach(func() {
// Ensure the polling logic is excercised in tests
for atomic.LoadInt32(&counter) < 4 {
continue
}
stop, errs = StartTestHandler(h)
})

Context("and the handler is stopped", func() {
Context("when the handler is stopped", func() {
JustBeforeEach(func() {
close(stop)
})

It("should not return an error", func() {
Eventually(errs).Should(Receive(BeNil()))
})
})

It("should not delete the machine", func() {
key := client.ObjectKey{Namespace: testMachine.Namespace, Name: testMachine.Name}
Consistently(func() error {
m := &machinev1.Machine{}
return k8sClient.Get(ctx, key, m)
}).Should(Succeed())
Context("when no machine exists for the node", func() {
It("should return an error upon starting", func() {
Eventually(errs).Should(Receive(MatchError("error fetching machine for node (\"test-node\"): machine not found for node \"test-node\"")))
})
})

Context("and the instance termination notice is fulfilled", func() {
It("should delete the machine", func() {
key := client.ObjectKey{Namespace: testMachine.Namespace, Name: testMachine.Name}
Eventually(func() error {
m := &machinev1.Machine{}
err := k8sClient.Get(ctx, key, m)
if err != nil && errors.IsNotFound(err) {
return nil
} else if err != nil {
return err
Context("when a machine exists for the node", func() {
var counter int32
var testMachine *machinev1.Machine

BeforeEach(func() {
testMachine = newTestMachine("test-machine", testNamespace, nodeName)
createMachine(testMachine)

// Ensure the polling logic is excercised in tests
httpHandler = newMockHTTPHandler(func(rw http.ResponseWriter, req *http.Request) {
if atomic.LoadInt32(&counter) == 4 {
rw.Write([]byte("TRUE"))
} else {
atomic.AddInt32(&counter, 1)
rw.Write([]byte("FALSE"))
}
return fmt.Errorf("machine not yet deleted")
}).Should(Succeed())
})
})
})

Context("and the instance termination notice is not fulfilled", func() {
BeforeEach(func() {
httpHandler = newMockHTTPHandler(notPreempted)
JustBeforeEach(func() {
// Ensure the polling logic is excercised in tests
for atomic.LoadInt32(&counter) < 4 {
continue
}
})

It("should not delete the machine", func() {
key := client.ObjectKey{Namespace: testMachine.Namespace, Name: testMachine.Name}
Consistently(func() error {
m := &machinev1.Machine{}
return k8sClient.Get(ctx, key, m)
}).Should(Succeed())
Context("and the handler is stopped", func() {
JustBeforeEach(func() {
close(stop)
})

It("should not return an error", func() {
Eventually(errs).Should(Receive(BeNil()))
})

It("should not delete the machine", func() {
key := client.ObjectKey{Namespace: testMachine.Namespace, Name: testMachine.Name}
Consistently(func() error {
m := &machinev1.Machine{}
return k8sClient.Get(ctx, key, m)
}).Should(Succeed())
})
})
})

Context("and the poll URL cannot be reached", func() {
BeforeEach(func() {
h.pollURL = &url.URL{Opaque: "abc#1://localhost"}
Context("and the instance termination notice is fulfilled", func() {
It("should delete the machine", func() {
key := client.ObjectKey{Namespace: testMachine.Namespace, Name: testMachine.Name}
Eventually(func() error {
m := &machinev1.Machine{}
err := k8sClient.Get(ctx, key, m)
if err != nil && errors.IsNotFound(err) {
return nil
} else if err != nil {
return err
}
return fmt.Errorf("machine not yet deleted")
}).Should(Succeed())
})
})

It("should return an error", func() {
Eventually(errs).Should(Receive(MatchError("error polling termination endpoint: could not get URL \"abc#1://localhost\": Get abc#1://localhost: unsupported protocol scheme \"\"")))
Context("and the instance termination notice is not fulfilled", func() {
BeforeEach(func() {
httpHandler = newMockHTTPHandler(notPreempted)
})

It("should not delete the machine", func() {
key := client.ObjectKey{Namespace: testMachine.Namespace, Name: testMachine.Name}
Consistently(func() error {
m := &machinev1.Machine{}
return k8sClient.Get(ctx, key, m)
}).Should(Succeed())
})
})

It("should not delete the machine", func() {
key := client.ObjectKey{Namespace: testMachine.Namespace, Name: testMachine.Name}
Consistently(func() error {
m := &machinev1.Machine{}
return k8sClient.Get(ctx, key, m)
}).Should(Succeed())
Context("and the poll URL cannot be reached", func() {
BeforeEach(func() {
h.pollURL = &url.URL{Opaque: "abc#1://localhost"}
})

It("should return an error", func() {
Eventually(errs).Should(Receive(MatchError("error polling termination endpoint: could not get URL \"abc#1://localhost\": Get abc#1://localhost: unsupported protocol scheme \"\"")))
})

It("should not delete the machine", func() {
key := client.ObjectKey{Namespace: testMachine.Namespace, Name: testMachine.Name}
Consistently(func() error {
m := &machinev1.Machine{}
return k8sClient.Get(ctx, key, m)
}).Should(Succeed())
})
})
})
})
Expand Down

0 comments on commit c71ec2f

Please sign in to comment.