Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kubectl wait: wire generic context #114574

Merged
merged 3 commits into from Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 21 additions & 17 deletions staging/src/k8s.io/kubectl/pkg/cmd/wait/wait.go
Expand Up @@ -280,18 +280,21 @@ type WaitOptions struct {
}

// ConditionFunc is the interface for providing condition checks
type ConditionFunc func(info *resource.Info, o *WaitOptions) (finalObject runtime.Object, done bool, err error)
type ConditionFunc func(ctx context.Context, info *resource.Info, o *WaitOptions) (finalObject runtime.Object, done bool, err error)

// RunWait runs the waiting logic
func (o *WaitOptions) RunWait() error {
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
defer cancel()

visitCount := 0
visitFunc := func(info *resource.Info, err error) error {
if err != nil {
return err
}

visitCount++
finalObject, success, err := o.ConditionFn(info, o)
finalObject, success, err := o.ConditionFn(ctx, info, o)
if success {
o.Printer.PrintObj(finalObject, o.Out)
return nil
Expand All @@ -318,7 +321,7 @@ func (o *WaitOptions) RunWait() error {
}

// IsDeleted is a condition func for waiting for something to be deleted
func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
if len(info.Name) == 0 {
return info.Object, false, fmt.Errorf("resource name must be provided")
}
Expand Down Expand Up @@ -357,9 +360,6 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error
return info.Object, false, errWaitTimeoutWithName
}

ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
defer cancel()

fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
Expand All @@ -386,9 +386,14 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error
return false, nil
}

intrCtx, cancel := context.WithCancel(ctx)
defer cancel()
intr := interrupt.New(nil, cancel)
err := intr.Run(func() error {
_, err := watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted)
_, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted)
if errors.Is(err, context.DeadlineExceeded) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this will only work when #114578 lands?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately, yes, it is correct

return errWaitTimeoutWithName
}
return err
})
if err != nil {
Expand Down Expand Up @@ -427,7 +432,7 @@ type checkCondFunc func(obj *unstructured.Unstructured) (bool, error)

// getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
// If the condition is not met, it will make a Watch query to the server and pass in the condMet function
func getObjAndCheckCondition(info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
if len(info.Name) == 0 {
return info.Object, false, fmt.Errorf("resource name must be provided")
}
Expand Down Expand Up @@ -458,9 +463,6 @@ func getObjAndCheckCondition(info *resource.Info, o *WaitOptions, condMet isCond
return info.Object, false, errWaitTimeoutWithName
}

ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
defer cancel()

mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
lw := &cache.ListWatch{
Expand All @@ -487,14 +489,16 @@ func getObjAndCheckCondition(info *resource.Info, o *WaitOptions, condMet isCond
return false, nil
}

intrCtx, cancel := context.WithCancel(ctx)
defer cancel()
var result runtime.Object
intr := interrupt.New(nil, cancel)
err := intr.Run(func() error {
ev, err := watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet))
ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet))
if ev != nil {
result = ev.Object
}
if err == context.DeadlineExceeded {
if errors.Is(err, context.DeadlineExceeded) {
return errWaitTimeoutWithName
}
return err
Expand All @@ -518,8 +522,8 @@ type ConditionalWait struct {
}

// IsConditionMet is a conditionfunc for waiting on an API condition to be met
func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
return getObjAndCheckCondition(info, o, w.isConditionMet, w.checkCondition)
func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition)
}

func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
Expand Down Expand Up @@ -592,8 +596,8 @@ type JSONPathWait struct {
}

// IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check
func (j JSONPathWait) IsJSONPathConditionMet(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
return getObjAndCheckCondition(info, o, j.isJSONPathConditionMet, j.checkCondition)
func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition)
}

// isJSONPathConditionMet is a helper function of IsJSONPathConditionMet
Expand Down
45 changes: 40 additions & 5 deletions test/cmd/wait.sh
Expand Up @@ -37,15 +37,13 @@ run_wait_tests() {

# wait with jsonpath will timout for busybox deployment
set +o errexit

# Command: Wait with jsonpath support fields not exist in the first place
output_message=$(kubectl wait --for=jsonpath=.status.readyReplicas=1 deploy/test-1)

output_message=$(kubectl wait --for=jsonpath=.status.readyReplicas=1 deploy/test-1 2>&1)
set -o errexit

# Post-Condition: Wait failed
kube::test::if_has_string "${output_message}" 'timed out'

set -o errexit

# Delete all deployments async to kubectl wait
( sleep 2 && kubectl delete deployment --all ) &

Expand All @@ -56,6 +54,43 @@ run_wait_tests() {
kube::test::if_has_string "${output_message}" 'test-1 condition met'
kube::test::if_has_string "${output_message}" 'test-2 condition met'

# create test data to test timeout error is occurred in correct time
kubectl apply -f - <<EOF
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: dtest
name: dtest
spec:
replicas: 3
selector:
matchLabels:
app: dtest
template:
metadata:
labels:
app: dtest
spec:
containers:
- name: bb
image: busybox
command: ["/bin/sh", "-c", "sleep infinity"]
EOF

set +o errexit
# wait timeout error because condition is invalid
start_sec=$(date +"%s")
output_message=$(time kubectl wait pod --selector=app=dtest --for=condition=InvalidCondition --timeout=1s 2>&1)
end_sec=$(date +"%s")
len_sec=$((end_sec-start_sec))
set -o errexit
kube::test::if_has_string "${output_message}" 'timed out waiting for the condition '
kube::test::if_has_string "${len_sec}" '1'

# Clean deployment
kubectl delete deployment dtest

set +o nounset
set +o errexit
}