Skip to content

Commit

Permalink
Add support for setting pod tolerations (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Mar 3, 2020
1 parent c3c2663 commit e191024
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type JobManagerConfig struct {
Replicas *int32 `json:"replicas,omitempty"`
OffHeapMemoryFraction *float64 `json:"offHeapMemoryFraction,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Tolerations []apiv1.Toleration `json:"tolerations,omitempty"`
}

type TaskManagerConfig struct {
Expand All @@ -117,6 +118,7 @@ type TaskManagerConfig struct {
TaskSlots *int32 `json:"taskSlots,omitempty"`
OffHeapMemoryFraction *float64 `json:"offHeapMemoryFraction,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Tolerations []apiv1.Toleration `json:"tolerations,omitempty"`
}

type EnvironmentConfig struct {
Expand Down
19 changes: 19 additions & 0 deletions pkg/apis/app/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/controller/flink/job_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func jobmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment {
Volumes: app.Spec.Volumes,
ImagePullSecrets: app.Spec.ImagePullSecrets,
NodeSelector: app.Spec.JobManagerConfig.NodeSelector,
Tolerations: app.Spec.JobManagerConfig.Tolerations,
},
},
},
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/flink/job_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,19 @@ func TestJobManagerCreateSuccess(t *testing.T) {
app.Spec.JarName = testJarName
app.Spec.EntryClass = testEntryClass
app.Spec.ProgramArgs = testProgramArgs
app.Spec.JobManagerConfig.Tolerations = []coreV1.Toleration{{
Key: "key",
Operator: "Equal",
Value: "Value",
Effect: "NoSchedule",
}}

annotations := map[string]string{
"key": "annotation",
"flink-job-properties": "jarName: " + testJarName + "\nparallelism: 8\nentryClass:" + testEntryClass + "\nprogramArgs:\"" + testProgramArgs + "\"",
}
app.Annotations = annotations
hash := "c3c0af0b"
hash := "5e7c7283"
expectedLabels := map[string]string{
"flink-app": "app-name",
"flink-app-hash": hash,
Expand All @@ -75,6 +82,7 @@ func TestJobManagerCreateSuccess(t *testing.T) {
assert.Equal(t, annotations, deployment.Spec.Template.Annotations)
assert.Equal(t, app.Namespace, deployment.Spec.Template.Namespace)
assert.Equal(t, expectedLabels, deployment.Labels)
assert.Equal(t, app.Spec.JobManagerConfig.Tolerations, deployment.Spec.Template.Spec.Tolerations)
assert.Equal(t, int32(1), *deployment.Spec.Replicas)
assert.Equal(t, "app-name", deployment.OwnerReferences[0].Name)
assert.Equal(t, "flink.k8s.io/v1beta1", deployment.OwnerReferences[0].APIVersion)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/flink/task_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func taskmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment {
Volumes: app.Spec.Volumes,
ImagePullSecrets: app.Spec.ImagePullSecrets,
NodeSelector: app.Spec.TaskManagerConfig.NodeSelector,
Tolerations: app.Spec.TaskManagerConfig.Tolerations,
},
},
},
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/flink/task_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,19 @@ func TestTaskManagerCreateSuccess(t *testing.T) {
app.Spec.JarName = testJarName
app.Spec.EntryClass = testEntryClass
app.Spec.ProgramArgs = testProgramArgs
app.Spec.TaskManagerConfig.Tolerations = []coreV1.Toleration{{
Key: "key",
Operator: "Equal",
Value: "Value",
Effect: "NoSchedule",
}}

annotations := map[string]string{
"key": "annotation",
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
}

hash := "c3c0af0b"
hash := "6b5e9b61"

app.Annotations = annotations
expectedLabels := map[string]string{
Expand All @@ -79,6 +86,7 @@ func TestTaskManagerCreateSuccess(t *testing.T) {
assert.Equal(t, annotations, deployment.Spec.Template.Annotations)
assert.Equal(t, app.Namespace, deployment.Spec.Template.Namespace)
assert.Equal(t, expectedLabels, deployment.Labels)
assert.Equal(t, app.Spec.TaskManagerConfig.Tolerations, deployment.Spec.Template.Spec.Tolerations)

assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1572864k\n"+
"jobmanager.rpc.port: 6123\n"+
Expand Down

0 comments on commit e191024

Please sign in to comment.