Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/scaling/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ There are three main components to the autoscaling system: autoscaler, activator

## Queue-proxy
Whenever a user deploys an application, it is injected with a queue-proxy sidecar container. This queue-proxy does the following:
- Any request coming to the application user-container will go to the queue-proxy and then be routed to the user-container
- Any readiness and liveness probes defined by the user will be replaced with queue-proxy probes externally to the pod while the user-containers probe endpoints will only be accessed by the proxy.
- Any request coming to the application serving container will go to the queue-proxy and then be routed to the serving container.
- Any readiness and liveness probes defined by the user will be replaced with queue-proxy probes externally to the pod while the serving container probe endpoints will only be accessed by the proxy.
- Makes sure that no more than the ‘defined container concurrency’ requests reach the application's instance at once by queueing other requests. For example if a revision defines a concurrency limit of 5, the queue-proxy makes sure that no more than 5 requests reach the application's instance at once. If there are more requests being sent to it than that, it will queue them locally.
- Collects metrics about the load of requests the application container receives and reports the `average concurrency` and `requests per second` on a separate port.

Expand Down
26 changes: 21 additions & 5 deletions pkg/apis/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ const (
// specified by the user, if `name:` is omitted.
DefaultInitContainerName = "init-container"

// DefaultServingContainerName is the default name we give to the container
// that serves traffic, if `name:` is omitted.
DefaultServingContainerName = "user-container"

// DefaultUserContainerName is the default name we give to the container
// specified by the user, if `name:` is omitted.
DefaultUserContainerName = "user-container"
// Kept for compatibility with existing callers; new code should use
// DefaultServingContainerName.
DefaultUserContainerName = DefaultServingContainerName

// DefaultContainerConcurrency is the default container concurrency. It will be set if ContainerConcurrency is not specified.
DefaultContainerConcurrency = 0
Expand All @@ -73,8 +79,11 @@ const (
)

var (
DefaultInitContainerNameTemplate = mustParseTemplate(DefaultInitContainerName)
DefaultUserContainerNameTemplate = mustParseTemplate(DefaultUserContainerName)
DefaultInitContainerNameTemplate = mustParseTemplate(DefaultInitContainerName)
DefaultServingContainerNameTemplate = mustParseTemplate(DefaultServingContainerName)
// Kept for compatibility with existing callers; new code should use
// DefaultServingContainerNameTemplate.
DefaultUserContainerNameTemplate = DefaultServingContainerNameTemplate
)

func defaultDefaultsConfig() *Defaults {
Expand All @@ -84,7 +93,7 @@ func defaultDefaultsConfig() *Defaults {
RevisionResponseStartTimeoutSeconds: DefaultRevisionResponseStartTimeoutSeconds,
RevisionIdleTimeoutSeconds: DefaultRevisionIdleTimeoutSeconds,
InitContainerNameTemplate: DefaultInitContainerNameTemplate,
UserContainerNameTemplate: DefaultUserContainerNameTemplate,
UserContainerNameTemplate: DefaultServingContainerNameTemplate,
ContainerConcurrency: DefaultContainerConcurrency,
ContainerConcurrencyMaxLimit: DefaultMaxRevisionContainerConcurrency,
AllowContainerConcurrencyZero: DefaultAllowContainerConcurrencyZero,
Expand Down Expand Up @@ -226,9 +235,16 @@ func containerNameFromTemplate(ctx context.Context, tmpl *ObjectMetaTemplate) st
return buf.String()
}

// ServingContainerName returns the name of the serving container based on the context.
func (d Defaults) ServingContainerName(ctx context.Context) string {
return containerNameFromTemplate(ctx, d.UserContainerNameTemplate)
}

// UserContainerName returns the name of the user container based on the context.
// Kept for compatibility with existing callers; new code should use
// ServingContainerName.
func (d Defaults) UserContainerName(ctx context.Context) string {
return containerNameFromTemplate(ctx, d.UserContainerNameTemplate)
return d.ServingContainerName(ctx)
}

// InitContainerName returns the name of the init container based on the context.
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/config/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ func TestTemplating(t *testing.T) {
Namespace: "guardians",
})

if got, want := def.UserContainerName(ctx), test.want; got != want {
t.Errorf("UserContainerName() = %v, wanted %v", got, want)
if got, want := def.ServingContainerName(ctx), test.want; got != want {
t.Errorf("ServingContainerName() = %v, wanted %v", got, want)
}

if got, want := def.InitContainerName(ctx), test.want; got != want {
Expand Down
29 changes: 22 additions & 7 deletions pkg/apis/serving/k8s_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func ValidatePodSpec(ctx context.Context, ps corev1.PodSpec) *apis.FieldError {
case 0:
errs = errs.Also(apis.ErrMissingField("containers"))
case 1:
errs = errs.Also(ValidateUserContainer(ctx, ps.Containers[0], volumes, port).
errs = errs.Also(ValidateServingContainer(ctx, ps.Containers[0], volumes, port).
ViaFieldIndex("containers", 0))
default:
errs = errs.Also(validateContainers(ctx, ps.Containers, volumes, port))
Expand Down Expand Up @@ -479,7 +479,7 @@ func validateContainers(ctx context.Context, containers []corev1.Container, volu
if len(containers[i].Ports) == 0 {
errs = errs.Also(validateSidecarContainer(WithinSidecarContainer(ctx), containers[i], volumes).ViaFieldIndex("containers", i))
} else {
errs = errs.Also(ValidateUserContainer(WithinUserContainer(ctx), containers[i], volumes, port).ViaFieldIndex("containers", i))
errs = errs.Also(ValidateServingContainer(WithinServingContainer(ctx), containers[i], volumes, port).ViaFieldIndex("containers", i))
}
}
return errs
Expand Down Expand Up @@ -585,15 +585,22 @@ func validateInitContainer(ctx context.Context, container corev1.Container, volu
return errs.Also(validate(WithinInitContainer(ctx), container, volumes))
}

// ValidateUserContainer validate fields for serving containers
func ValidateUserContainer(ctx context.Context, container corev1.Container, volumes map[string]corev1.Volume, port corev1.ContainerPort) (errs *apis.FieldError) {
// ValidateServingContainer validates fields for containers that serve traffic.
func ValidateServingContainer(ctx context.Context, container corev1.Container, volumes map[string]corev1.Volume, port corev1.ContainerPort) (errs *apis.FieldError) {
// Liveness Probes
errs = errs.Also(validateProbe(container.LivenessProbe, &port, true).ViaField("livenessProbe"))
// Readiness Probes
errs = errs.Also(validateReadinessProbe(container.ReadinessProbe, &port, true).ViaField("readinessProbe"))
return errs.Also(validate(ctx, container, volumes))
}

// ValidateUserContainer validate fields for serving containers
// Kept for compatibility with existing callers; new code should use
// ValidateServingContainer.
func ValidateUserContainer(ctx context.Context, container corev1.Container, volumes map[string]corev1.Volume, port corev1.ContainerPort) (errs *apis.FieldError) {
return ValidateServingContainer(ctx, container, volumes, port)
}

func validate(ctx context.Context, container corev1.Container, volumes map[string]corev1.Volume) *apis.FieldError {
if equality.Semantic.DeepEqual(container, corev1.Container{}) {
return apis.ErrMissingField(apis.CurrentField)
Expand Down Expand Up @@ -1035,14 +1042,22 @@ func warnDefaultContainerSecurityContext(ctx context.Context, psc *corev1.PodSec
return errs
}

// This is attached to contexts as they are passed down through a user container
// This is attached to contexts as they are passed down through a serving container
// being validated.
type userContainer struct{}
type servingContainer struct{}

// WithinServingContainer notes on the context that further validation or defaulting
// is within the context of a serving container in the revision.
func WithinServingContainer(ctx context.Context) context.Context {
return context.WithValue(ctx, servingContainer{}, struct{}{})
}

// WithinUserContainer notes on the context that further validation or defaulting
// is within the context of a user container in the revision.
// Kept for compatibility with existing callers; new code should use
// WithinServingContainer.
func WithinUserContainer(ctx context.Context) context.Context {
return context.WithValue(ctx, userContainer{}, struct{}{})
return WithinServingContainer(ctx)
}

// This is attached to contexts as they are passed down through a sidecar container
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/serving/k8s_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2146,10 +2146,10 @@ func TestUserContainerValidation(t *testing.T) {
}
port, err := validateContainersPorts([]corev1.Container{test.c})

got := err.Also(ValidateUserContainer(ctx, test.c, test.volumes, port))
got := err.Also(ValidateServingContainer(ctx, test.c, test.volumes, port))
got = got.Filter(apis.ErrorLevel)
if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" {
t.Errorf("ValidateUserContainer (-want, +got): \n%s", diff)
t.Errorf("ValidateServingContainer (-want, +got): \n%s", diff)
}
})
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/apis/serving/v1/revision_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (rs *RevisionSpec) SetDefaults(ctx context.Context) {
for idx := range rs.PodSpec.InitContainers {
containerNames.Insert(rs.PodSpec.InitContainers[idx].Name)
}
defaultUserContainerName := cfg.Defaults.UserContainerName(ctx)
applyDefaultContainerNames(rs.PodSpec.Containers, containerNames, defaultUserContainerName)
defaultServingContainerName := cfg.Defaults.ServingContainerName(ctx)
applyDefaultContainerNames(rs.PodSpec.Containers, containerNames, defaultServingContainerName)
defaultInitContainerName := cfg.Defaults.InitContainerName(ctx)
applyDefaultContainerNames(rs.PodSpec.InitContainers, containerNames, defaultInitContainerName)
for idx := range rs.PodSpec.Containers {
Expand Down Expand Up @@ -134,7 +134,7 @@ func (rs *RevisionSpec) applyDefault(ctx context.Context, container *corev1.Cont
// If there are multiple containers then default probes will be applied to the container where user specified PORT
// default probes will not be applied for non serving containers
if len(rs.PodSpec.Containers) == 1 || len(container.Ports) != 0 {
rs.applyUserContainerDefaultReadinessProbe(container)
rs.applyServingContainerDefaultReadinessProbe(container)
}
rs.applyReadinessProbeDefaults(container)
rs.applyGRPCProbeDefaults(container)
Expand All @@ -157,7 +157,7 @@ func (rs *RevisionSpec) applyDefault(ctx context.Context, container *corev1.Cont
}
}

func (*RevisionSpec) applyUserContainerDefaultReadinessProbe(container *corev1.Container) {
func (*RevisionSpec) applyServingContainerDefaultReadinessProbe(container *corev1.Container) {
if container.ReadinessProbe == nil {
container.ReadinessProbe = &corev1.Probe{}
}
Expand Down
24 changes: 17 additions & 7 deletions pkg/reconciler/revision/resources/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,19 @@ var (

// This PreStop hook is actually calling an endpoint on the queue-proxy
// because of the way PreStop hooks are called by kubelet. We use this
// to block the user-container from exiting before the queue-proxy is ready
// to block the serving container from exiting before the queue-proxy is ready
// to exit so we can guarantee that there are no more requests in flight.
userLifecycle = &corev1.Lifecycle{
servingLifecycle = &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
HTTPGet: &corev1.HTTPGetAction{
Port: intstr.FromInt(networking.QueueAdminPort),
Path: queue.RequestQueueDrainPath,
},
},
}
// Kept for compatibility with existing callers; new code should use
// servingLifecycle.
userLifecycle = servingLifecycle
)

func addToken(tokenVolume *corev1.Volume, filename string, audience string, expiry *int64) {
Expand Down Expand Up @@ -205,7 +208,7 @@ func makePodSpec(rev *v1.Revision, cfg *config.Config) (*corev1.PodSpec, error)
extraVolumes = append(extraVolumes, certVolume(networking.ServingCertName))
}

podSpec := BuildPodSpec(rev, append(BuildUserContainers(rev), *queueContainer), cfg)
podSpec := BuildPodSpec(rev, append(BuildServingContainers(rev), *queueContainer), cfg)
podSpec.Volumes = append(podSpec.Volumes, extraVolumes...)

if val := cfg.Deployment.PodRuntimeClassName(rev.ObjectMeta.Labels); podSpec.RuntimeClassName == nil {
Expand Down Expand Up @@ -235,8 +238,8 @@ func makePodSpec(rev *v1.Revision, cfg *config.Config) (*corev1.PodSpec, error)
return podSpec, nil
}

// BuildUserContainers makes an array of containers from the Revision template.
func BuildUserContainers(rev *v1.Revision) []corev1.Container {
// BuildServingContainers makes an array of containers from the Revision template.
func BuildServingContainers(rev *v1.Revision) []corev1.Container {
containers := make([]corev1.Container, 0, len(rev.Spec.PodSpec.Containers))
for i := range rev.Spec.PodSpec.Containers {
var container corev1.Container
Expand All @@ -258,10 +261,17 @@ func BuildUserContainers(rev *v1.Revision) []corev1.Container {
return containers
}

// BuildUserContainers makes an array of containers from the Revision template.
// Kept for compatibility with existing callers; new code should use
// BuildServingContainers.
func BuildUserContainers(rev *v1.Revision) []corev1.Container {
return BuildServingContainers(rev)
}

func makeContainer(container corev1.Container, rev *v1.Revision) corev1.Container {
// Adding or removing an overwritten corev1.Container field here? Don't forget to
// update the fieldmasks / validations in pkg/apis/serving
container.Lifecycle = userLifecycle
container.Lifecycle = servingLifecycle
container.Env = append(container.Env, getKnativeEnvVar(rev)...)

// Explicitly disable stdin and tty allocation
Expand Down Expand Up @@ -289,7 +299,7 @@ func makeServingContainer(servingContainer corev1.Container, rev *v1.Revision) c
servingContainer.Ports = buildContainerPorts(userPort)
servingContainer.Env = append(servingContainer.Env, buildUserPortEnv(userPortStr))
container := makeContainer(servingContainer, rev)
// If the user provides a liveness probe, we should rewrite in the port on the user-container for them.
// If the user provides a liveness probe, we should rewrite in the port on the serving container for them.
rewriteUserLivenessProbe(container.LivenessProbe, int(userPort))
return container
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/webhook/podspec_dryrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func validatePodSpec(ctx context.Context, ps v1.RevisionSpec, namespace string)
Spec: ps,
}
rev.SetDefaults(ctx)
podSpec := resources.BuildPodSpec(rev, resources.BuildUserContainers(rev), nil /*configs*/)
podSpec := resources.BuildPodSpec(rev, resources.BuildServingContainers(rev), nil /*configs*/)

// Make a sample pod with the template Revisions & PodSpec and dryrun call to API-server
pod := &corev1.Pod{
Expand Down
12 changes: 6 additions & 6 deletions test/conformance/runtime/liveness_probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,16 @@ func TestLivenessWithFail(t *testing.T) {
t.Fatalf("The endpoint for Route %s at %s didn't return success: %v", names.Route, url, err)
}

// Check that user-container hasn't been restarted yet.
// Check that the serving container hasn't been restarted yet.
deploymentName := resourcenames.Deployment(resources.Revision)
podList, err := clients.KubeClient.CoreV1().Pods(test.ServingFlags.TestNamespace).List(context.Background(), metav1.ListOptions{})
if err != nil {
t.Fatal("Unable to get pod list: ", err)
}
for i := range podList.Items {
pod := &podList.Items[i]
if strings.Contains(pod.Name, deploymentName) && test.UserContainerRestarted(pod) {
t.Fatal("User container unexpectedly restarted")
if strings.Contains(pod.Name, deploymentName) && test.ServingContainerRestarted(pod) {
t.Fatal("Serving container unexpectedly restarted")
}
}

Expand All @@ -122,21 +122,21 @@ func TestLivenessWithFail(t *testing.T) {
t.Fatalf("POST to /start-failing failed: %v", err)
}

// Wait for the user-container to be restarted.
// Wait for the serving container to be restarted.
if err := pkgtest.WaitForPodListState(
context.Background(),
clients.KubeClient,
func(p *corev1.PodList) (bool, error) {
for i := range p.Items {
pod := &p.Items[i]
if strings.Contains(pod.Name, deploymentName) && test.UserContainerRestarted(pod) {
if strings.Contains(pod.Name, deploymentName) && test.ServingContainerRestarted(pod) {
return true, nil
}
}
return false, nil
},
"WaitForContainerRestart", test.ServingFlags.TestNamespace); err != nil {
t.Fatalf("Failed waiting for user-container to be restarted: %v", err)
t.Fatalf("Failed waiting for serving container to be restarted: %v", err)
}

// After restart, verify the liveness probe passes a few times again.
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/readiness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,16 @@ func TestLivenessProbeAwareOfStartupProbe(t *testing.T) {
t.Fatalf("The endpoint %s for Route %s didn't serve the expected text %q: %v", url, names.Route, test.HelloWorldText, err)
}

// Check that user-container hasn't been restarted.
// Check that the serving container hasn't been restarted.
deploymentName := resourcenames.Deployment(resources.Revision)
podList, err := clients.KubeClient.CoreV1().Pods(test.ServingFlags.TestNamespace).List(context.Background(), metav1.ListOptions{})
if err != nil {
t.Fatal("Unable to get pod list: ", err)
}
for i := range podList.Items {
pod := &podList.Items[i]
if strings.Contains(pod.Name, deploymentName) && test.UserContainerRestarted(pod) {
t.Fatal("User container unexpectedly restarted")
if strings.Contains(pod.Name, deploymentName) && test.ServingContainerRestarted(pod) {
t.Fatal("Serving container unexpectedly restarted")
}
}
}
13 changes: 10 additions & 3 deletions test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,19 @@ func AddTestAnnotation(t testing.TB, m metav1.ObjectMeta) {
})
}

// UserContainerRestarted checks if the container was restarted.
func UserContainerRestarted(pod *corev1.Pod) bool {
// ServingContainerRestarted checks if the serving container was restarted.
func ServingContainerRestarted(pod *corev1.Pod) bool {
for _, status := range pod.Status.ContainerStatuses {
if status.Name == config.DefaultUserContainerName && status.RestartCount > 0 {
if status.Name == config.DefaultServingContainerName && status.RestartCount > 0 {
return true
}
}
return false
}

// UserContainerRestarted checks if the container was restarted.
// Kept for compatibility with existing callers; new code should use
// ServingContainerRestarted.
func UserContainerRestarted(pod *corev1.Pod) bool {
return ServingContainerRestarted(pod)
}
Loading