Skip to content

Commit

Permalink
feat: expose cpu/mem info to sidecar containers (#678)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy authored Apr 14, 2023
1 parent 666ebf2 commit a6e8174
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 2 deletions.
6 changes: 5 additions & 1 deletion docs/user-guide/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

For the `numa` container of vertex pods, environment variable `NUMAFLOW_DEBUG` can be set to `true` for [debugging](../development/debugging.md).

In [`udf`](user-defined-functions/map/map.md) and [`udsink`](./sinks/user-defined-sinks.md) containers, there are some preset environment variables that can be used directly.
In [`udf`](user-defined-functions/map/map.md), [`udsink`](./sinks/user-defined-sinks.md) and [`transformer`](./sources/transformer/overview.md) containers, there are some preset environment variables that can be used directly.

- `NUMAFLOW_NAMESPACE` - Namespace.
- `NUMAFLOW_POD` - Pod name.
- `NUMAFLOW_REPLICA` - Replica index.
- `NUMAFLOW_PIPELINE_NAME` - Name of the pipeline.
- `NUMAFLOW_VERTEX_NAME` - Name of the vertex.
- `NUMAFLOW_CPU_REQUEST` - `resources.requests.cpu`, roundup to N cores, `0` if missing.
- `NUMAFLOW_CPU_LIMIT` - `resources.limits.cpu`, roundup to N cores, use host cpu cores if missing.
- `NUMAFLOW_MEMORY_REQUEST` - `resources.requests.memory` in bytes, `0` if missing.
- `NUMAFLOW_MEMORY_LIMIT` - `resources.limits.memory` in bytes, use host memory if missing.

For setting environment variables on pods not owned by a vertex, see [Pipeline Customization](./pipeline-customization.md).

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ const (
EnvPPROF = "NUMAFLOW_PPROF"
EnvHealthCheckDisabled = "NUMAFLOW_HEALTH_CHECK_DISABLED"
EnvGRPCMaxMessageSize = "NUMAFLOW_GRPC_MAX_MESSAGE_SIZE"
EnvCPURequest = "NUMAFLOW_CPU_REQUEST"
EnvCPULimit = "NUMAFLOW_CPU_LIMIT"
EnvMemoryRequest = "NUMAFLOW_MEMORY_REQUEST"
EnvMemoryLimit = "NUMAFLOW_MEMORY_LIMIT"

PathVarRun = "/var/run/numaflow"
VertexMetricsPort = 2469
Expand Down
20 changes: 19 additions & 1 deletion pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (v Vertex) getServiceObj(name string, headless bool, port int, servicePortN
return svc
}

// CommonEnvs returns the common envs for all vertex pod containers.
func (v Vertex) commonEnvs() []corev1.EnvVar {
return []corev1.EnvVar{
{Name: EnvNamespace, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}}},
Expand All @@ -162,6 +163,20 @@ func (v Vertex) commonEnvs() []corev1.EnvVar {
}
}

// SidecarEnvs returns the envs for sidecar containers.
func (v Vertex) sidecarEnvs() []corev1.EnvVar {
return []corev1.EnvVar{
{Name: EnvCPULimit, ValueFrom: &corev1.EnvVarSource{
ResourceFieldRef: &corev1.ResourceFieldSelector{Resource: "limits.cpu"}}},
{Name: EnvCPURequest, ValueFrom: &corev1.EnvVarSource{
ResourceFieldRef: &corev1.ResourceFieldSelector{Resource: "requests.cpu"}}},
{Name: EnvMemoryLimit, ValueFrom: &corev1.EnvVarSource{
ResourceFieldRef: &corev1.ResourceFieldSelector{Resource: "limits.memory"}}},
{Name: EnvMemoryRequest, ValueFrom: &corev1.EnvVarSource{
ResourceFieldRef: &corev1.ResourceFieldSelector{Resource: "requests.memory"}}},
}
}

func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
vertexCopy := &Vertex{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -229,7 +244,10 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
}

if len(containers) > 1 { // udf, udsink or source vertex specifies a udtransformer
containers[1].Env = append(containers[1].Env, v.commonEnvs()...)
for i := 1; i < len(containers); i++ {
containers[i].Env = append(containers[i].Env, v.commonEnvs()...)
containers[i].Env = append(containers[i].Env, v.sidecarEnvs()...)
}
}

spec := &corev1.PodSpec{
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ func TestGetPodSpec(t *testing.T) {
assert.Equal(t, "cmd", s.Containers[1].Command[0])
assert.Equal(t, 1, len(s.Containers[1].Args))
assert.Equal(t, "arg0", s.Containers[1].Args[0])
sidecarEnvNames := []string{}
for _, env := range s.Containers[1].Env {
sidecarEnvNames = append(sidecarEnvNames, env.Name)
}
assert.Contains(t, sidecarEnvNames, EnvCPULimit)
assert.Contains(t, sidecarEnvNames, EnvMemoryLimit)
assert.Contains(t, sidecarEnvNames, EnvCPURequest)
assert.Contains(t, sidecarEnvNames, EnvMemoryRequest)
})

t.Run("test udf", func(t *testing.T) {
Expand Down Expand Up @@ -326,6 +334,14 @@ func TestGetPodSpec(t *testing.T) {
assert.Contains(t, s.Containers[0].Args, "--type="+string(VertexTypeMapUDF))
assert.Equal(t, 1, len(s.InitContainers))
assert.Equal(t, CtrInit, s.InitContainers[0].Name)
sidecarEnvNames := []string{}
for _, env := range s.Containers[1].Env {
sidecarEnvNames = append(sidecarEnvNames, env.Name)
}
assert.Contains(t, sidecarEnvNames, EnvCPULimit)
assert.Contains(t, sidecarEnvNames, EnvMemoryLimit)
assert.Contains(t, sidecarEnvNames, EnvCPURequest)
assert.Contains(t, sidecarEnvNames, EnvMemoryRequest)
})
}

Expand Down

0 comments on commit a6e8174

Please sign in to comment.