Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: monitor K8s livez/readyz after initialization #5623

Merged
merged 4 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/cli/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 34 additions & 10 deletions internal/controllers/core/cluster/cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"context"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -35,15 +36,30 @@ const (
)

type connection struct {
connType connectionType
spec v1alpha1.ClusterSpec
connType connectionType
spec v1alpha1.ClusterSpec

// createdAt is when the connection object was created.
// If initError is empty, it's effectively the time we connected to the
// cluster. Otherwise, it's when we _attempted_ to initialize the client
// and is used for retry/backoff.
createdAt time.Time

// initError is populated when the client cannot be instantiated.
// For example, if there's no ~/.kube/config, a Kubernetes client
// can't be created.
initError string

dockerClient docker.Client
k8sClient k8s.Client
error string
createdAt time.Time
arch string
registry *container.Registry
connStatus *v1alpha1.ClusterConnectionStatus

// statusError is populated if the client has been successfully initialized
// but is failing a health/readiness check.
statusError string
arch string
registry *container.Registry
connStatus *v1alpha1.ClusterConnectionStatus
cancelMonitor context.CancelFunc
}

func (k *ConnectionManager) GetK8sClient(clusterKey types.NamespacedName) (k8s.Client, metav1.MicroTime, error) {
Expand Down Expand Up @@ -75,9 +91,11 @@ func (k *ConnectionManager) validConnOrError(key types.NamespacedName, connType
return connection{}, fmt.Errorf("incorrect cluster client type: got %s, expected %s",
conn.connType, connType)
}
if conn.error != "" {
return connection{}, errors.New(conn.error)
if conn.initError != "" {
return connection{}, errors.New(conn.initError)
}
// N.B. even if there is a statusError, the client is still returned, as it
// might still be functional even though it's in a degraded state
return conn, nil
}

Expand All @@ -94,5 +112,11 @@ func (k *ConnectionManager) load(key types.NamespacedName) (connection, bool) {
}

func (k *ConnectionManager) delete(key types.NamespacedName) {
k.connections.Delete(key)
v, ok := k.connections.LoadAndDelete(key)
if ok {
conn := v.(connection)
if conn.cancelMonitor != nil {
conn.cancelMonitor()
}
}
}
4 changes: 2 additions & 2 deletions internal/controllers/core/cluster/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestConnectionManager(t *testing.T) {
input: &connection{
connType: connectionTypeK8s,
k8sClient: fakeK8s,
error: "connection error",
initError: "connection error",
},
expectedK8sErr: "connection error",
expectedDockerErr: "incorrect cluster client type: got kubernetes, expected docker",
Expand All @@ -58,7 +58,7 @@ func TestConnectionManager(t *testing.T) {
input: &connection{
connType: connectionTypeDocker,
dockerClient: fakeDocker,
error: "some docker error",
initError: "some docker error",
},
expectedK8sErr: "incorrect cluster client type: got docker, expected kubernetes",
expectedDockerErr: "some docker error",
Expand Down
60 changes: 60 additions & 0 deletions internal/controllers/core/cluster/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package cluster

import (
"context"
"errors"

"k8s.io/apimachinery/pkg/types"

"github.com/tilt-dev/tilt/internal/k8s"
)

func (r *Reconciler) monitorConn(ctx context.Context, clusterNN types.NamespacedName, conn connection) {
if conn.connType != connectionTypeK8s {
// live connection monitoring for Docker not yet supported
return
}

ticker := r.clock.NewTicker(clientHealthPollInterval)
defer ticker.Stop()
for {
lastErr := conn.statusError

err := doKubernetesHealthCheck(ctx, conn.k8sClient)
if err != nil {
conn.statusError = err.Error()
} else {
conn.statusError = ""
}

if conn.statusError != lastErr {
r.connManager.store(clusterNN, conn)
r.requeuer.Add(clusterNN)
}

select {
case <-ticker.Chan():
case <-ctx.Done():
return
}
}
}

func doKubernetesHealthCheck(ctx context.Context, client k8s.Client) error {
// TODO(milas): use verbose=true and propagate the info to the Tilt API
// cluster obj to show in the web UI
health, err := client.ClusterHealth(ctx, false)
if err != nil {
return err
}

if !health.Live {
return errors.New("cluster did not pass liveness check")
}

if !health.Ready {
return errors.New("cluster not ready")
}

return nil
}
93 changes: 71 additions & 22 deletions internal/controllers/core/cluster/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ import (
"fmt"
"time"

"github.com/jonboulle/clockwork"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/tilt-dev/tilt/internal/analytics"
"github.com/tilt-dev/tilt/internal/controllers/apicmp"
"github.com/tilt-dev/tilt/internal/controllers/indexer"
"github.com/tilt-dev/tilt/internal/docker"
"github.com/tilt-dev/tilt/internal/k8s"
"github.com/tilt-dev/tilt/internal/store"
Expand All @@ -27,10 +30,17 @@ import (

const ArchUnknown string = "unknown"

const (
clientInitBackoff = 30 * time.Second
clientHealthPollInterval = 15 * time.Second
)

type Reconciler struct {
globalCtx context.Context
ctrlClient ctrlclient.Client
store store.RStore
requeuer *indexer.Requeuer
clock clockwork.Clock
connManager *ConnectionManager

localDockerEnv docker.LocalEnv
Expand All @@ -41,14 +51,16 @@ type Reconciler struct {

func (r *Reconciler) CreateBuilder(mgr ctrl.Manager) (*builder.Builder, error) {
b := ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.Cluster{})
For(&v1alpha1.Cluster{}).
Watches(r.requeuer, handler.Funcs{})
return b, nil
}

func NewReconciler(
globalCtx context.Context,
ctrlClient ctrlclient.Client,
store store.RStore,
clock clockwork.Clock,
connManager *ConnectionManager,
localDockerEnv docker.LocalEnv,
dockerClientFactory DockerClientFactory,
Expand All @@ -58,6 +70,8 @@ func NewReconciler(
globalCtx: globalCtx,
ctrlClient: ctrlClient,
store: store,
clock: clock,
requeuer: indexer.NewRequeuer(),
connManager: connManager,
localDockerEnv: localDockerEnv,
dockerClientFactory: dockerClientFactory,
Expand Down Expand Up @@ -85,40 +99,71 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (
r.store.Dispatch(clusters.NewClusterUpsertAction(&obj))

conn, hasConnection := r.connManager.load(nn)
if hasConnection {
// If this is not the first time we've tried to connect to the cluster,
// only attempt to refresh the connection if the feature is enabled. Not
// all parts of Tilt use a dynamically-obtained client currently, which
// can result in erratic behavior if the cluster is not in a usable state
// at startup but then becomes usable, for example, as some parts of the
// system will still have k8s.explodingClient.
if hasConnection && obj.Annotations["features.tilt.dev/cluster-refresh"] == "true" {
// If the spec changed, delete the connection and recreate it.
if !apicmp.DeepEqual(conn.spec, obj.Spec) {
r.connManager.delete(nn)
conn = connection{}
hasConnection = false
} else if conn.initError != "" && r.clock.Now().After(conn.createdAt.Add(clientInitBackoff)) {
hasConnection = false
}
}

var requeueAfter time.Duration
if !hasConnection {
// Create the initial connection to the cluster.
conn = connection{spec: *obj.Spec.DeepCopy(), createdAt: r.clock.Now()}
if obj.Spec.Connection != nil && obj.Spec.Connection.Kubernetes != nil {
conn = r.createKubernetesConnection(obj.Spec.Connection.Kubernetes)
conn.connType = connectionTypeK8s
client, err := r.createKubernetesClient(obj.DeepCopy())
if err != nil {
conn.initError = err.Error()
} else {
conn.k8sClient = client
}
} else if obj.Spec.Connection != nil && obj.Spec.Connection.Docker != nil {
conn = r.createDockerConnection(obj.Spec.Connection.Docker)
client, err := r.createDockerClient(obj.Spec.Connection.Docker)
if err != nil {
conn.initError = err.Error()
} else {
conn.dockerClient = client
}
}

if conn.initError != "" {
// requeue the cluster Obj so that we can attempt to re-initialize
requeueAfter = clientInitBackoff
} else {
// start monitoring the connection and requeue the Cluster obj
// for reconciliation if its runtime status changes
monitorCtx, monitorCancel := context.WithCancel(r.globalCtx)
conn.cancelMonitor = monitorCancel
go r.monitorConn(monitorCtx, nn, conn)
}
conn.createdAt = time.Now()
conn.spec = obj.Spec
}

if conn.error == "" && conn.arch == "" {
// once cluster connection is established, try to populate arch
if conn.initError == "" && conn.arch == "" {
if conn.k8sClient != nil {
conn.arch = r.readKubernetesArch(ctx, conn.k8sClient)
} else if conn.dockerClient != nil {
conn.arch = r.readDockerArch(ctx, conn.dockerClient)
}
}

if conn.error == "" && conn.connType == connectionTypeK8s && conn.registry == nil {
if conn.initError == "" && conn.connType == connectionTypeK8s && conn.registry == nil {
reg := conn.k8sClient.LocalRegistry(ctx)
conn.registry = &reg
}

if conn.error == "" && conn.connType == connectionTypeK8s {
if conn.initError == "" && conn.connType == connectionTypeK8s {
connStatus := conn.k8sClient.ConnectionConfig()
conn.connStatus = &v1alpha1.ClusterConnectionStatus{
Kubernetes: connStatus,
Expand All @@ -133,11 +178,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}

// Creates a docker connection from the spec.
func (r *Reconciler) createDockerConnection(obj *v1alpha1.DockerClusterConnection) connection {
func (r *Reconciler) createDockerClient(obj *v1alpha1.DockerClusterConnection) (docker.Client, error) {
// If no Host is specified, use the default Env from environment variables.
env := docker.Env(r.localDockerEnv)
if obj.Host != "" {
Expand All @@ -146,21 +191,20 @@ func (r *Reconciler) createDockerConnection(obj *v1alpha1.DockerClusterConnectio

client, err := r.dockerClientFactory.New(r.globalCtx, env)
if err != nil {
return connection{connType: connectionTypeDocker, error: err.Error()}
return nil, err
}

return connection{connType: connectionTypeDocker, dockerClient: client}
return client, nil
}

// Creates a Kubernetes connection from the spec.
func (r *Reconciler) createKubernetesConnection(obj *v1alpha1.KubernetesClusterConnection) connection {
k8sKubeContextOverride := k8s.KubeContextOverride(obj.Context)
k8sNamespaceOverride := k8s.NamespaceOverride(obj.Namespace)
// Creates a Kubernetes client from the spec.
func (r *Reconciler) createKubernetesClient(cluster *v1alpha1.Cluster) (k8s.Client, error) {
k8sKubeContextOverride := k8s.KubeContextOverride(cluster.Spec.Connection.Kubernetes.Context)
k8sNamespaceOverride := k8s.NamespaceOverride(cluster.Spec.Connection.Kubernetes.Namespace)
client, err := r.k8sClientFactory.New(r.globalCtx, k8sKubeContextOverride, k8sNamespaceOverride)
if err != nil {
return connection{connType: connectionTypeK8s, error: err.Error()}
return nil, err
}
return connection{connType: connectionTypeK8s, k8sClient: client}
return client, nil
}

// Reads the arch from a kubernetes cluster, or "unknown" if we can't
Expand Down Expand Up @@ -246,11 +290,16 @@ func (r *Reconciler) reportConnectionEvent(ctx context.Context, cluster *v1alpha

func (c *connection) toStatus() v1alpha1.ClusterStatus {
var connectedAt *metav1.MicroTime
if c.error == "" && !c.createdAt.IsZero() {
if c.initError == "" && !c.createdAt.IsZero() {
t := apis.NewMicroTime(c.createdAt)
connectedAt = &t
}

clusterError := c.initError
if clusterError == "" {
clusterError = c.statusError
}

var reg *v1alpha1.RegistryHosting
if c.registry != nil {
reg = &v1alpha1.RegistryHosting{
Expand All @@ -262,7 +311,7 @@ func (c *connection) toStatus() v1alpha1.ClusterStatus {
}

return v1alpha1.ClusterStatus{
Error: c.error,
Error: clusterError,
Arch: c.arch,
ConnectedAt: connectedAt,
Registry: reg,
Expand Down
Loading