Skip to content

Commit

Permalink
[Backward Incompatible] Add env variables for Flink managers and fix …
Browse files Browse the repository at this point in the history
…for JM HA (#45)

* Add env variables for Flink managers and fix for JM HA
* Inject customized env from the operator
  • Loading branch information
anandswaminathan committed Jul 17, 2019
1 parent 9053e4f commit 373cb21
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 54 deletions.
2 changes: 1 addition & 1 deletion examples/wordcount/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ENV PATH=$FLINK_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$PATH
COPY . /code

# Configure Flink version
ENV FLINK_VERSION=1.8.0 \
ENV FLINK_VERSION=1.8.1 \
HADOOP_SCALA_VARIANT=scala_2.12

# Install dependencies
Expand Down
18 changes: 5 additions & 13 deletions examples/wordcount/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,13 @@ drop_privs_cmd() {
fi
}

envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml

# As the taskmanager pods are accessible only by (cluster) ip address,
# we must manually configure this based on the podIp kubernetes
# variable, which is assigned to TASKMANAGER_HOSTNAME env var by the
# operator.
if [ -n "$TASKMANAGER_HOSTNAME" ]; then
echo "taskmanager.host: $TASKMANAGER_HOSTNAME" >> "$FLINK_HOME/conf/flink-conf.yaml"
fi

# Add in extra configs set by the operator
if [ -n "$OPERATOR_FLINK_CONFIG" ]; then
echo "$OPERATOR_FLINK_CONFIG" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "$OPERATOR_FLINK_CONFIG" >> "/usr/local/flink-conf.yaml"
fi

envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml

COMMAND=$@

if [ $# -lt 1 ]; then
Expand All @@ -37,11 +29,11 @@ fi
if [ "$COMMAND" = "help" ]; then
echo "Usage: $(basename "$0") (jobmanager|taskmanager|local|help)"
exit 0
elif [ "$COMMAND" = "jobmanager" ]; then
elif [ "$FLINK_DEPLOYMENT_TYPE" = "jobmanager" ]; then
echo "Starting Job Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground
elif [ "$COMMAND" = "taskmanager" ]; then
elif [ "$FLINK_DEPLOYMENT_TYPE" = "taskmanager" ]; then
echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
Expand Down
4 changes: 2 additions & 2 deletions integ/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ By default the tests create, use, and clean up the namespace
These tests use a sample Flink job [operator-test-app](/integ/operator-test-app/). The
tests currently use two images built from here:

* `lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.1`
* `lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.2`
* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1`
* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2`

Those images are available on our private Dockerhub registry, and you
will either need to pull them locally or give Kubernetes access to the
Expand Down
2 changes: 1 addition & 1 deletion integ/operator-test-app/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ENV PATH=$FLINK_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$PATH
COPY . /code

# Configure Flink version
ENV FLINK_VERSION=1.8.0 \
ENV FLINK_VERSION=1.8.1 \
HADOOP_SCALA_VARIANT=scala_2.12

# Install dependencies
Expand Down
18 changes: 5 additions & 13 deletions integ/operator-test-app/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,13 @@ drop_privs_cmd() {
fi
}

envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml

# As the taskmanager pods are accessible only by (cluster) ip address,
# we must manually configure this based on the podIp kubernetes
# variable, which is assigned to TASKMANAGER_HOSTNAME env var by the
# operator.
if [ -n "$TASKMANAGER_HOSTNAME" ]; then
echo "taskmanager.host: $TASKMANAGER_HOSTNAME" >> "$FLINK_HOME/conf/flink-conf.yaml"
fi

# Add in extra configs set by the operator
if [ -n "$OPERATOR_FLINK_CONFIG" ]; then
echo "$OPERATOR_FLINK_CONFIG" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "$OPERATOR_FLINK_CONFIG" >> "/usr/local/flink-conf.yaml"
fi

envsubst < /usr/local/flink-conf.yaml > $FLINK_HOME/conf/flink-conf.yaml

COMMAND=$@

if [ $# -lt 1 ]; then
Expand All @@ -37,11 +29,11 @@ fi
if [ "$COMMAND" = "help" ]; then
echo "Usage: $(basename "$0") (jobmanager|taskmanager|local|help)"
exit 0
elif [ "$COMMAND" = "jobmanager" ]; then
elif [ "$FLINK_DEPLOYMENT_TYPE" = "jobmanager" ]; then
echo "Starting Job Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground
elif [ "$COMMAND" = "taskmanager" ]; then
elif [ "$FLINK_DEPLOYMENT_TYPE" = "taskmanager" ]; then
echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
Expand Down
2 changes: 1 addition & 1 deletion integ/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const NewImage = "lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.2"
const NewImage = "lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2"

func updateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1alpha1.FlinkApplication), failurePhase v1alpha1.FlinkApplicationPhase) *v1alpha1.FlinkApplication {
app, err := s.Util.GetFlinkApplication(name)
Expand Down
2 changes: 1 addition & 1 deletion integ/test_app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
labels:
environment: development
spec:
image: lyft/operator-test-app:6c45caca225489895cb1353dae25069b5d43746f.1
image: lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
imagePullSecrets:
- name: dockerhub
flinkConfig:
Expand Down
13 changes: 13 additions & 0 deletions pkg/controller/flink/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package flink

import (
"strings"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1"
"gopkg.in/yaml.v2"
)
Expand All @@ -14,6 +16,7 @@ const (
UIDefaultPort = 8081
MetricsQueryDefaultPort = 50101
OffHeapMemoryDefaultFraction = 0.5
HighAvailabilityKey = "high-availability"
)

func firstNonNil(x *int32, y int32) int32 {
Expand Down Expand Up @@ -118,3 +121,13 @@ func renderFlinkConfig(app *v1alpha1.FlinkApplication) (string, error) {
}
return string(b), nil
}

func isHAEnabled(flinkConfig v1alpha1.FlinkConfig) bool {
if val, ok := flinkConfig[HighAvailabilityKey]; ok {
value := val.(string)
if strings.ToLower(strings.TrimSpace(value)) != "none" {
return true
}
}
return false
}
34 changes: 31 additions & 3 deletions pkg/controller/flink/container_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (
AwsMetadataServiceTimeout = "5"
AwsMetadataServiceNumAttempts = "20"
OperatorFlinkConfig = "OPERATOR_FLINK_CONFIG"
HostName = "HOST_NAME"
HostIP = "HOST_IP"
FlinkDeploymentTypeEnv = "FLINK_DEPLOYMENT_TYPE"
FlinkDeploymentType = "flink-deployment-type"
FlinkDeploymentTypeJobmanager = "jobmanager"
FlinkDeploymentTypeTaskmanager = "taskmanager"
Expand Down Expand Up @@ -87,6 +90,22 @@ func getFlinkEnv(app *v1alpha1.FlinkApplication) ([]v1.EnvVar, error) {
Name: OperatorFlinkConfig,
Value: flinkConfig,
},
{
Name: HostName,
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: HostIP,
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
}...)
return env, nil
}
Expand Down Expand Up @@ -155,14 +174,23 @@ func HashForApplication(app *v1alpha1.FlinkApplication) string {
return fmt.Sprintf("%08x", hasher.Sum32())
}

func InjectHashesIntoConfig(deployment *appsv1.Deployment, app *v1alpha1.FlinkApplication, hash string) {
func InjectOperatorCustomizedConfig(deployment *appsv1.Deployment, app *v1alpha1.FlinkApplication, hash string, deploymentType string) {
var newContainers []v1.Container
for _, container := range deployment.Spec.Template.Spec.Containers {
var newEnv []v1.EnvVar
for _, env := range container.Env {
if env.Name == OperatorFlinkConfig {
env.Value = fmt.Sprintf("%s\nhigh-availability.cluster-id: %s-%s\n", env.Value, app.Name, hash)
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerServiceName(app, hash))
if isHAEnabled(app.Spec.FlinkConfig) {
env.Value = fmt.Sprintf("%s\nhigh-availability.cluster-id: %s-%s\n", env.Value, app.Name, hash)
if deploymentType == FlinkDeploymentTypeJobmanager {
env.Value = fmt.Sprintf("%sjobmanager.rpc.address: $HOST_IP\n", env.Value)
}
} else {
env.Value = fmt.Sprintf("%s\njobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerServiceName(app, hash))
}
if deploymentType == FlinkDeploymentTypeTaskmanager {
env.Value = fmt.Sprintf("%staskmanager.host: $HOST_IP\n", env.Value)
}
}
newEnv = append(newEnv, env)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
const testImage = "123.xyz.com/xx:11ae1218924428faabd9b64423fa0c332efba6b2"

// Note: if you find yourself changing this to fix a test, that should be treated as a breaking API change
const testAppHash = "cb56c9a1"
const testAppHash = "de844839"
const testAppName = "app-name"
const testNamespace = "ns"
const testJobID = "j1"
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/flink/job_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
JobManagerPodNameFormat = "%s-%s-jm-pod"
JobManagerContainerName = "jobmanager"
JobManagerArg = "jobmanager"
JobManagerReadinessPath = "/config"
JobManagerReadinessPath = "/overview"
JobManagerReadinessInitialDelaySec = 10
JobManagerReadinessTimeoutSec = 1
JobManagerReadinessSuccessThreshold = 1
Expand Down Expand Up @@ -247,6 +247,10 @@ func FetchJobManagerContainerObj(application *v1alpha1.FlinkApplication) *coreV1

ports := getJobManagerPorts(application)
operatorEnv := GetFlinkContainerEnv(application)
operatorEnv = append(operatorEnv, coreV1.EnvVar{
Name: FlinkDeploymentTypeEnv,
Value: FlinkDeploymentTypeJobmanager,
})
operatorEnv = append(operatorEnv, jmConfig.Environment.Env...)

return &coreV1.Container{
Expand Down Expand Up @@ -342,7 +346,7 @@ func FetchJobMangerDeploymentCreateObj(app *v1alpha1.FlinkApplication, hash stri
template.Spec.Selector.MatchLabels[FlinkAppHash] = hash
template.Spec.Template.Name = getJobManagerPodName(app, hash)

InjectHashesIntoConfig(template, app, hash)
InjectOperatorCustomizedConfig(template, app, hash, FlinkDeploymentTypeJobmanager)

return template
}
Expand Down
90 changes: 84 additions & 6 deletions pkg/controller/flink/job_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
)

var testJarName = "test.jar"
var testEntryClass = "com.test.MainClass"
var testProgramArgs = "--test"

func getJMControllerForTest() JobManagerController {
testScope := mockScope.NewTestScope()
labeled.SetMetricKeys(common.GetValidLabelNames()...)
Expand All @@ -43,15 +47,15 @@ func TestGetJobManagerPodName(t *testing.T) {
func TestJobManagerCreateSuccess(t *testing.T) {
testController := getJMControllerForTest()
app := getFlinkTestApp()
app.Spec.JarName = "test.jar"
app.Spec.EntryClass = "com.test.MainClass"
app.Spec.ProgramArgs = "--test"
app.Spec.JarName = testJarName
app.Spec.EntryClass = testEntryClass
app.Spec.ProgramArgs = testProgramArgs
annotations := map[string]string{
"key": "annotation",
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
"flink-job-properties": "jarName: " + testJarName + "\nparallelism: 8\nentryClass:" + testEntryClass + "\nprogramArgs:\"" + testProgramArgs + "\"",
}
app.Annotations = annotations
hash := "334c7c5d"
hash := "390e4c6d"
expectedLabels := map[string]string{
"flink-app": "app-name",
"flink-app-hash": hash,
Expand Down Expand Up @@ -81,7 +85,6 @@ func TestJobManagerCreateSuccess(t *testing.T) {
"jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 512\n"+
"taskmanager.numberOfTaskSlots: 16\n\n"+
"high-availability.cluster-id: app-name-"+hash+"\n"+
"jobmanager.rpc.address: app-name-"+hash+"\n",
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
"OPERATOR_FLINK_CONFIG").Value)
Expand Down Expand Up @@ -112,6 +115,81 @@ func TestJobManagerCreateSuccess(t *testing.T) {
assert.True(t, newlyCreated)
}

func TestJobManagerHACreateSuccess(t *testing.T) {
testController := getJMControllerForTest()
app := getFlinkTestApp()
app.Spec.JarName = testJarName
app.Spec.EntryClass = testEntryClass
app.Spec.ProgramArgs = testProgramArgs
annotations := map[string]string{
"key": "annotation",
"flink-job-properties": "jarName: " + testJarName + "\nparallelism: 8\nentryClass:" + testEntryClass + "\nprogramArgs:\"" + testProgramArgs + "\"",
}
app.Annotations = annotations
app.Spec.FlinkConfig = map[string]interface{}{
"high-availability": "zookeeper",
}
hash := "fda698ef"
expectedLabels := map[string]string{
"flink-app": "app-name",
"flink-app-hash": hash,
"flink-deployment-type": "jobmanager",
}
ctr := 0
mockK8Cluster := testController.k8Cluster.(*k8mock.K8Cluster)
mockK8Cluster.CreateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error {
ctr++
switch ctr {
case 1:
deployment := object.(*v1.Deployment)
assert.Equal(t, getJobManagerName(&app, hash), deployment.Name)
assert.Equal(t, app.Namespace, deployment.Namespace)
assert.Equal(t, getJobManagerPodName(&app, hash), deployment.Spec.Template.Name)
assert.Equal(t, annotations, deployment.Annotations)
assert.Equal(t, annotations, deployment.Spec.Template.Annotations)
assert.Equal(t, app.Namespace, deployment.Spec.Template.Namespace)
assert.Equal(t, expectedLabels, deployment.Labels)
assert.Equal(t, int32(1), *deployment.Spec.Replicas)
assert.Equal(t, "app-name", deployment.OwnerReferences[0].Name)
assert.Equal(t, "flink.k8s.io/v1alpha1", deployment.OwnerReferences[0].APIVersion)
assert.Equal(t, "FlinkApplication", deployment.OwnerReferences[0].Kind)

assert.Equal(t, "blob.server.port: 6125\nhigh-availability: zookeeper\njobmanager.heap.size: 1536\n"+
"jobmanager.rpc.port: 6123\n"+
"jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+
"query.server.port: 6124\ntaskmanager.heap.size: 512\n"+
"taskmanager.numberOfTaskSlots: 16\n\n"+
"high-availability.cluster-id: app-name-"+hash+"\n"+
"jobmanager.rpc.address: $HOST_IP\n",
common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env,
"OPERATOR_FLINK_CONFIG").Value)
case 2:
service := object.(*coreV1.Service)
assert.Equal(t, app.Name, service.Name)
assert.Equal(t, app.Namespace, service.Namespace)
assert.Equal(t, map[string]string{"flink-app": "app-name", "flink-app-hash": hash, "flink-deployment-type": "jobmanager"}, service.Spec.Selector)
case 3:
service := object.(*coreV1.Service)
assert.Equal(t, app.Name+"-"+hash, service.Name)
assert.Equal(t, "app-name", service.OwnerReferences[0].Name)
assert.Equal(t, app.Namespace, service.Namespace)
assert.Equal(t, map[string]string{"flink-app": "app-name", "flink-app-hash": hash, "flink-deployment-type": "jobmanager"}, service.Spec.Selector)
case 4:
labels := map[string]string{
"flink-app": "app-name",
}
ingress := object.(*v1beta1.Ingress)
assert.Equal(t, app.Name, ingress.Name)
assert.Equal(t, app.Namespace, ingress.Namespace)
assert.Equal(t, labels, ingress.Labels)
}
return nil
}
newlyCreated, err := testController.CreateIfNotExist(context.Background(), &app)
assert.Nil(t, err)
assert.True(t, newlyCreated)
}

func TestJobManagerCreateErr(t *testing.T) {
testController := getJMControllerForTest()
app := getFlinkTestApp()
Expand Down

0 comments on commit 373cb21

Please sign in to comment.