Skip to content

Commit

Permalink
k8s: remove dead container watching code (#4365)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicks committed Mar 25, 2021
1 parent 910f16e commit 6f35c78
Show file tree
Hide file tree
Showing 8 changed files with 0 additions and 337 deletions.
7 changes: 0 additions & 7 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -82,12 +81,6 @@ type Client interface {
GetMetaByReference(ctx context.Context, ref v1.ObjectReference) (ObjectMeta, error)
ListMeta(ctx context.Context, gvk schema.GroupVersionKind, ns Namespace) ([]ObjectMeta, error)

PodByID(ctx context.Context, podID PodID, n Namespace) (*v1.Pod, error)

// Creates a channel where all changes to the pod are brodcast.
// Takes a pod as input, to indicate the version of the pod where we start watching.
WatchPod(ctx context.Context, pod *v1.Pod) (watch.Interface, error)

// Streams the container logs
ContainerLogs(ctx context.Context, podID PodID, cName container.Name, n Namespace, startTime time.Time) (io.ReadCloser, error)

Expand Down
36 changes: 0 additions & 36 deletions internal/k8s/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"helm.sh/helm/v3/pkg/kube"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -257,37 +255,3 @@ func newClientTestFixture(t *testing.T) *clientTestFixture {
func (c clientTestFixture) k8sUpsert(ctx context.Context, entities []K8sEntity) ([]K8sEntity, error) {
return c.client.Upsert(ctx, entities, time.Minute)
}

func (c clientTestFixture) addObject(obj runtime.Object) {
err := c.tracker.Add(obj)
if err != nil {
c.t.Fatal(err)
}
}

func (c clientTestFixture) getPod(id PodID) *v1.Pod {
c.t.Helper()

pod, err := c.client.core.Pods(DefaultNamespace.String()).Get(c.ctx, id.String(), metav1.GetOptions{})
if err != nil {
c.t.Fatal(err)
}

return pod
}

func (c clientTestFixture) updatePod(pod *v1.Pod) {
gvks, _, err := scheme.Scheme.ObjectKinds(pod)
if err != nil {
c.t.Fatalf("updatePod: %v", err)
} else if len(gvks) == 0 {
c.t.Fatal("Could not parse pod into k8s schema")
}
for _, gvk := range gvks {
gvr, _ := meta.UnsafeGuessKindToResource(gvk)
err = c.tracker.Update(gvr, pod, NamespaceFromPod(pod).String())
if err != nil {
c.t.Fatal(err)
}
}
}
89 changes: 0 additions & 89 deletions internal/k8s/container.go
Original file line number Diff line number Diff line change
@@ -1,106 +1,17 @@
package k8s

import (
"context"
"fmt"
"strings"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"

"github.com/tilt-dev/tilt/internal/container"
"github.com/tilt-dev/tilt/pkg/logger"
)

const ContainerIDPrefix = "docker://"

func WaitForContainerReady(ctx context.Context, client Client, pod *v1.Pod, ref container.RefSelector) (v1.ContainerStatus, error) {
cStatus, err := waitForContainerReadyHelper(pod, ref)
if err != nil {
return v1.ContainerStatus{}, err
} else if cStatus != (v1.ContainerStatus{}) {
return cStatus, nil
}

watch, err := client.WatchPod(ctx, pod)
if err != nil {
return v1.ContainerStatus{}, errors.Wrap(err, "WaitForContainerReady")
}
defer watch.Stop()

for {
select {
case <-ctx.Done():
return v1.ContainerStatus{}, errors.Wrap(ctx.Err(), "WaitForContainerReady")
case event, ok := <-watch.ResultChan():
if !ok {
return v1.ContainerStatus{}, fmt.Errorf("Container watch closed: %s", ref)
}

obj := event.Object
pod, ok := obj.(*v1.Pod)
if !ok {
logger.Get(ctx).Debugf("Unexpected watch notification: %T", obj)
continue
}

FixContainerStatusImages(pod)

cStatus, err := waitForContainerReadyHelper(pod, ref)
if err != nil {
return v1.ContainerStatus{}, err
} else if cStatus != (v1.ContainerStatus{}) {
return cStatus, nil
}
}
}
}

func waitForContainerReadyHelper(pod *v1.Pod, ref container.RefSelector) (v1.ContainerStatus, error) {
cStatus, err := ContainerMatching(pod, ref)
if err != nil {
return v1.ContainerStatus{}, errors.Wrap(err, "WaitForContainerReadyHelper")
}

unschedulable, msg := IsUnschedulable(pod.Status)
if unschedulable {
return v1.ContainerStatus{}, fmt.Errorf("Container will never be ready: %s", msg)
}

if IsContainerExited(pod.Status, cStatus) {
return v1.ContainerStatus{}, fmt.Errorf("Container will never be ready: %s", ref)
}

if !cStatus.Ready {
return v1.ContainerStatus{}, nil
}

return cStatus, nil
}

// If true, this means the container is gone and will never recover.
func IsContainerExited(pod v1.PodStatus, container v1.ContainerStatus) bool {
if pod.Phase == v1.PodSucceeded || pod.Phase == v1.PodFailed {
return true
}

if container.State.Terminated != nil {
return true
}

return false
}

// Returns the error message if the pod is unschedulable
func IsUnschedulable(pod v1.PodStatus) (bool, string) {
for _, cond := range pod.Conditions {
if cond.Reason == v1.PodReasonUnschedulable {
return true, cond.Message
}
}
return false, ""
}

// Kubernetes has a bug where the image ref in the container status
// can be wrong (though this does not mean the container is running
// unexpected code)
Expand Down
149 changes: 0 additions & 149 deletions internal/k8s/container_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package k8s

import (
"context"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"

"github.com/tilt-dev/tilt/internal/container"
)

func TestFixContainerStatusImages(t *testing.T) {
Expand Down Expand Up @@ -67,147 +62,3 @@ func TestFixContainerStatusImagesNoMutation(t *testing.T) {
newPod.Spec.Containers[0].Image,
newPod.Status.ContainerStatuses[0].Image)
}

func TestWaitForContainerAlreadyAlive(t *testing.T) {
f := newClientTestFixture(t)

nt := container.MustParseSelector(blorgDevImgStr)
podData := fakePod(expectedPod, blorgDevImgStr)
podData.Status = v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
ContainerID: "docker://container-id",
Image: nt.String(),
Ready: true,
},
},
}
f.addObject(podData)

ctx, cancel := context.WithTimeout(f.ctx, time.Second)
defer cancel()

pod := f.getPod(expectedPod)
cStatus, err := WaitForContainerReady(ctx, f.client, pod, nt)
if err != nil {
t.Fatal(err)
}

cID, err := ContainerIDFromContainerStatus(cStatus)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, "container-id", cID.String())
}

func TestWaitForContainerSuccess(t *testing.T) {
f := newClientTestFixture(t)
f.addObject(&fakePodList)

nt := container.MustParseTaggedSelector(blorgDevImgStr)
pod := f.getPod(expectedPod)

ctx, cancel := context.WithTimeout(f.ctx, time.Second)
defer cancel()

result := make(chan error)
go func() {
_, err := WaitForContainerReady(ctx, f.client, pod, nt)
result <- err
}()

newPod := fakePod(expectedPod, blorgDevImgStr)
newPod.Status = v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
ContainerID: "docker://container-id",
Image: nt.String(),
Ready: true,
},
},
}

<-f.watchNotify
f.updatePod(newPod)
err := <-result
if err != nil {
t.Fatal(err)
}
}

func TestWaitForContainerFailure(t *testing.T) {
f := newClientTestFixture(t)
f.addObject(&fakePodList)

nt := container.MustParseTaggedSelector(blorgDevImgStr)
pod := f.getPod(expectedPod)

ctx, cancel := context.WithTimeout(f.ctx, time.Second)
defer cancel()

result := make(chan error)
go func() {
_, err := WaitForContainerReady(ctx, f.client, pod, nt)
result <- err
}()

newPod := fakePod(expectedPod, blorgDevImgStr)
newPod.Status = v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
Image: nt.String(),
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{},
},
},
},
}

<-f.watchNotify
f.updatePod(newPod)
err := <-result

expected := "Container will never be ready"
if err == nil || !strings.Contains(err.Error(), expected) {
t.Fatalf("Expected error %q, actual: %v", expected, err)
}
}

func TestWaitForContainerUnschedulable(t *testing.T) {
f := newClientTestFixture(t)
f.addObject(&fakePodList)

nt := container.MustParseTaggedSelector(blorgDevImgStr)
pod := f.getPod(expectedPod)

ctx, cancel := context.WithTimeout(f.ctx, time.Second)
defer cancel()

result := make(chan error)
go func() {
_, err := WaitForContainerReady(ctx, f.client, pod, nt)
result <- err
}()

newPod := fakePod(expectedPod, blorgDevImgStr)
newPod.Status = v1.PodStatus{
Conditions: []v1.PodCondition{
{
Reason: v1.PodReasonUnschedulable,
Message: "0/4 nodes are available: 4 Insufficient cpu.",
Status: "False",
Type: v1.PodScheduled,
},
},
}

<-f.watchNotify
f.updatePod(newPod)
err := <-result

expected := "Container will never be ready: 0/4 nodes are available: 4 Insufficient cpu."
if err == nil || !strings.Contains(err.Error(), expected) {
t.Fatalf("Expected error %q, actual: %v", expected, err)
}
}
9 changes: 0 additions & 9 deletions internal/k8s/exploding_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"

"github.com/tilt-dev/tilt/internal/container"
"github.com/tilt-dev/tilt/pkg/model"
Expand Down Expand Up @@ -45,14 +44,6 @@ func (ec *explodingClient) PollForPodsWithImage(ctx context.Context, image refer
return nil, errors.Wrap(ec.err, "could not set up k8s client")
}

func (ec *explodingClient) PodByID(ctx context.Context, podID PodID, n Namespace) (*v1.Pod, error) {
return nil, errors.Wrap(ec.err, "could not set up k8s client")
}

func (ec *explodingClient) WatchPod(ctx context.Context, pod *v1.Pod) (watch.Interface, error) {
return nil, errors.Wrap(ec.err, "could not set up k8s client")
}

func (ec *explodingClient) ContainerLogs(ctx context.Context, podID PodID, cName container.Name, n Namespace, startTime time.Time) (io.ReadCloser, error) {
return nil, errors.Wrap(ec.err, "could not set up k8s client")
}
Expand Down
9 changes: 0 additions & 9 deletions internal/k8s/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"

"github.com/tilt-dev/tilt/internal/container"
"github.com/tilt-dev/tilt/pkg/logger"
Expand Down Expand Up @@ -324,10 +323,6 @@ func (c *FakeK8sClient) ListMeta(ctx context.Context, gvk schema.GroupVersionKin
return result, nil
}

func (c *FakeK8sClient) WatchPod(ctx context.Context, pod *v1.Pod) (watch.Interface, error) {
return watch.NewEmptyWatch(), nil
}

func (c *FakeK8sClient) SetLogsForPodContainer(pID PodID, cName container.Name, logs string) {
c.SetLogReaderForPodContainer(pID, cName, strings.NewReader(logs))
}
Expand All @@ -351,10 +346,6 @@ func (c *FakeK8sClient) ContainerLogs(ctx context.Context, pID PodID, cName cont
return ReaderCloser{Reader: bytes.NewBuffer(nil)}, nil
}

func (c *FakeK8sClient) PodByID(ctx context.Context, pID PodID, n Namespace) (*v1.Pod, error) {
return nil, nil
}

func FakePodStatus(image reference.NamedTagged, phase string) v1.PodStatus {
return v1.PodStatus{
Phase: v1.PodPhase(phase),
Expand Down

0 comments on commit 6f35c78

Please sign in to comment.