Skip to content

Commit

Permalink
[STRMCMP-509] Make old cluster clean-up idempotent (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
Micah Wylde committed Jun 17, 2019
1 parent 93f0e01 commit 1456182
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 251 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/flink/container_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func InjectHashesIntoConfig(deployment *appsv1.Deployment, app *v1alpha1.FlinkAp
for _, env := range container.Env {
if env.Name == OperatorFlinkConfig {
env.Value = fmt.Sprintf("%s\nhigh-availability.cluster-id: %s-%s\n", env.Value, app.Name, hash)
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerService(app, hash))
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerServiceName(app, hash))
}
newEnv = append(newEnv, env)
}
Expand Down
161 changes: 84 additions & 77 deletions pkg/controller/flink/flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/runtime"

"github.com/lyft/flinkk8soperator/pkg/controller/common"

"github.com/lyft/flinkk8soperator/pkg/controller/config"
Expand Down Expand Up @@ -45,9 +47,6 @@ type ControllerInterface interface {
// Creates a Flink cluster with necessary Job Manager, Task Managers and services for UI
CreateCluster(ctx context.Context, application *v1alpha1.FlinkApplication) error

// Deletes a Flink cluster based on the hash
DeleteCluster(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) error

// Cancels the running/active jobs in the Cluster for the Application after savepoint is created
CancelWithSavepoint(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (string, error)

Expand All @@ -72,8 +71,11 @@ type ControllerInterface interface {
// Returns the list of Jobs running on the Flink Cluster for the Application
GetJobsForApplication(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) ([]client.FlinkJob, error)

// For the application, a deployment corresponds to an image. This returns the current and older deployments for the app.
GetCurrentAndOldDeploymentsForApp(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, []common.FlinkDeployment, error)
// Returns the pair of deployments (tm/jm) for the current version of the application
GetCurrentDeploymentsForApp(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, error)

// Deletes all old resources (deployments and services) for the app
DeleteOldResourcesForApp(ctx context.Context, app *v1alpha1.FlinkApplication) error

// Attempts to find an externalized checkpoint for the job. This can be used to recover an application that is not
// able to savepoint for some reason.
Expand Down Expand Up @@ -105,18 +107,18 @@ func NewController(k8sCluster k8.ClusterInterface, config config.RuntimeConfig)
func newControllerMetrics(scope promutils.Scope) *controllerMetrics {
flinkControllerScope := scope.NewSubScope("flink_controller")
return &controllerMetrics{
scope: scope,
deleteClusterSuccessCounter: labeled.NewCounter("delete_cluster_success", "Flink cluster deleted successfully", flinkControllerScope),
deleteClusterFailedCounter: labeled.NewCounter("delete_cluster_failure", "Flink cluster deletion failed", flinkControllerScope),
applicationChangedCounter: labeled.NewCounter("app_changed_counter", "Flink application has changed", flinkControllerScope),
scope: scope,
deleteResourceSuccessCounter: labeled.NewCounter("delete_resource_success", "Flink resource deleted successfully", flinkControllerScope),
deleteResourceFailedCounter: labeled.NewCounter("delete_resource_failure", "Flink resource deletion failed", flinkControllerScope),
applicationChangedCounter: labeled.NewCounter("app_changed_counter", "Flink application has changed", flinkControllerScope),
}
}

type controllerMetrics struct {
scope promutils.Scope
deleteClusterSuccessCounter labeled.Counter
deleteClusterFailedCounter labeled.Counter
applicationChangedCounter labeled.Counter
scope promutils.Scope
deleteResourceSuccessCounter labeled.Counter
deleteResourceFailedCounter labeled.Counter
applicationChangedCounter labeled.Counter
}

type Controller struct {
Expand All @@ -128,7 +130,7 @@ type Controller struct {
}

func getURLFromApp(application *v1alpha1.FlinkApplication, hash string) string {
service := VersionedJobManagerService(application, hash)
service := VersionedJobManagerServiceName(application, hash)
cfg := config.GetConfig()
if cfg.UseProxy {
return fmt.Sprintf(proxyURL, cfg.ProxyPort.Port, application.Namespace, service)
Expand Down Expand Up @@ -251,39 +253,6 @@ func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1alph
return f.flinkClient.CheckSavepointStatus(ctx, getURLFromApp(application, hash), jobID, application.Spec.SavepointInfo.TriggerID)
}

func (f *Controller) DeleteCluster(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) error {
if hash == "" {
return errors.New("invalid hash: must not be empty")
}

jmDeployment := FetchJobMangerDeploymentDeleteObj(application, hash)
err := f.k8Cluster.DeleteK8Object(ctx, jmDeployment)
if err != nil {
f.metrics.deleteClusterFailedCounter.Inc(ctx)
logger.Warnf(ctx, "Failed to delete jobmanager deployment")
return err
}

tmDeployment := FetchTaskMangerDeploymentDeleteObj(application, hash)
err = f.k8Cluster.DeleteK8Object(ctx, tmDeployment)
if err != nil {
f.metrics.deleteClusterFailedCounter.Inc(ctx)
logger.Warnf(ctx, "Failed to delete taskmanager deployment")
return err
}

versionedJobService := FetchVersionedJobManagerServiceDeleteObj(application, hash)
err = f.k8Cluster.DeleteK8Object(ctx, versionedJobService)
if err != nil {
f.metrics.deleteClusterFailedCounter.Inc(ctx)
logger.Warnf(ctx, "Failed to delete versioned service")
return err
}

f.metrics.deleteClusterSuccessCounter.Inc(ctx)
return nil
}

func (f *Controller) IsClusterReady(ctx context.Context, application *v1alpha1.FlinkApplication) (bool, error) {
labelMap := GetAppHashSelector(application)

Expand Down Expand Up @@ -348,52 +317,90 @@ func listToFlinkDeployment(ds []v1.Deployment, hash string) *common.FlinkDeploym
return &fd
}

func getCurrentHash(app *v1alpha1.FlinkApplication) string {
appHash := HashForApplication(app)

if appHash == app.Status.FailedDeployHash {
return app.Status.DeployHash
}
return appHash
}

// Gets the current deployment and any other deployments for the application. The current deployment will be the one
// that matches the FlinkApplication, unless the FailedDeployHash is set, in which case it will be the one with that
// hash.
func (f *Controller) GetCurrentAndOldDeploymentsForApp(ctx context.Context,
application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, []common.FlinkDeployment, error) {
appLabels := k8.GetAppLabel(application.Name)
deployments, err := f.k8Cluster.GetDeploymentsWithLabel(ctx, application.Namespace, appLabels)
func (f *Controller) GetCurrentDeploymentsForApp(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, error) {
labels := k8.GetAppLabel(application.Name)
curHash := getCurrentHash(application)
labels[FlinkAppHash] = curHash

deployments, err := f.k8Cluster.GetDeploymentsWithLabel(ctx, application.Namespace, labels)
if err != nil {
return nil, nil, err
return nil, err
}

byHash := map[string][]v1.Deployment{}
for _, deployment := range deployments.Items {
byHash[deployment.Labels[FlinkAppHash]] = append(byHash[deployment.Labels[FlinkAppHash]], deployment)
cur := listToFlinkDeployment(deployments.Items, curHash)
if cur != nil && application.Status.FailedDeployHash == "" &&
(!f.deploymentMatches(ctx, cur.Jobmanager, application) || !f.deploymentMatches(ctx, cur.Taskmanager, application)) {
// we had a hash collision (i.e., the previous application has the same hash as the new one)
// this is *very* unlikely to occur (1/2^32)
return nil, errors.New("found hash collision for deployment, you must do a clean deploy")
}

appHash := HashForApplication(application)
var curHash string
return cur, nil
}

if appHash == application.Status.FailedDeployHash {
curHash = application.Status.DeployHash
} else {
curHash = appHash
func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1alpha1.FlinkApplication) error {
curHash := getCurrentHash(app)

appLabel := k8.GetAppLabel(app.Name)
deployments, err := f.k8Cluster.GetDeploymentsWithLabel(ctx, app.Namespace, appLabel)
if err != nil {
return err
}

cur := listToFlinkDeployment(byHash[curHash], curHash)
if cur != nil && application.Status.FailedDeployHash == "" &&
(!f.deploymentMatches(ctx, cur.Jobmanager, application) || !f.deploymentMatches(ctx, cur.Taskmanager, application)) {
// we had a hash collision (i.e., the previous application has the same hash as the new one)
// this is *very* unlikely to occur (1/2^32)
return nil, nil, errors.New("found hash collision for deployment, you must do a clean deploy")
oldObjects := make([]metav1.Object, 0)

for _, d := range deployments.Items {
if d.Labels[FlinkAppHash] != "" &&
d.Labels[FlinkAppHash] != curHash &&
// verify that this deployment matches the jobmanager or taskmanager naming format
(d.Name == fmt.Sprintf(JobManagerNameFormat, app.Name, d.Labels[FlinkAppHash]) ||
d.Name == fmt.Sprintf(TaskManagerNameFormat, app.Name, d.Labels[FlinkAppHash])) {
oldObjects = append(oldObjects, d.DeepCopy())
}
}

old := make([]common.FlinkDeployment, 0)
for hash, ds := range byHash {
if hash != curHash {
fd := listToFlinkDeployment(ds, hash)
if fd != nil {
old = append(old, *fd)
} else {
logger.Warn(ctx, "Found deployments that do not have one JM and TM: %v", ds)
}
services, err := f.k8Cluster.GetServicesWithLabel(ctx, app.Namespace, appLabel)
if err != nil {
return err
}

for _, d := range services.Items {
if d.Labels[FlinkAppHash] != "" &&
d.Labels[FlinkAppHash] != curHash &&
d.Name == VersionedJobManagerServiceName(app, d.Labels[FlinkAppHash]) {
oldObjects = append(oldObjects, d.DeepCopy())
}
}

return cur, old, nil
deletedHashes := make(map[string]bool)

for _, resource := range oldObjects {
err := f.k8Cluster.DeleteK8Object(ctx, resource.(runtime.Object))
if err != nil {
f.metrics.deleteResourceFailedCounter.Inc(ctx)
return err
}
f.metrics.deleteResourceSuccessCounter.Inc(ctx)
deletedHashes[resource.GetLabels()[FlinkAppHash]] = true
}

for k := range deletedHashes {
f.LogEvent(ctx, app, "", corev1.EventTypeNormal, fmt.Sprintf("Deleted old cluster with hash %s", k))
}

return nil
}

func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (string, error) {
Expand Down

0 comments on commit 1456182

Please sign in to comment.