Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait json #105776

Merged
merged 1 commit into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
169 changes: 156 additions & 13 deletions staging/src/k8s.io/kubectl/pkg/cmd/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"reflect"
"strings"
"time"

Expand All @@ -40,6 +41,8 @@ import (
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/dynamic"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/jsonpath"
cmdget "k8s.io/kubectl/pkg/cmd/get"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/i18n"
"k8s.io/kubectl/pkg/util/templates"
Expand All @@ -65,6 +68,9 @@ var (
# The default value of status condition is true; you can set it to false
kubectl wait --for=condition=Ready=false pod/busybox1

# Wait for the pod "busybox1" to contain the status phase to be "Running".
kubectl wait --for=jsonpath='{.status.phase}'=Running pod/busybox1

# Wait for the pod "busybox1" to be deleted, with a timeout of 60s, after having issued the "delete" command
kubectl delete pod/busybox1
kubectl wait --for=delete pod/busybox1 --timeout=60s`))
Expand Down Expand Up @@ -111,7 +117,7 @@ func NewCmdWait(restClientGetter genericclioptions.RESTClientGetter, streams gen
flags := NewWaitFlags(restClientGetter, streams)

cmd := &cobra.Command{
Use: "wait ([-f FILENAME] | resource.group/resource.name | resource.group [(-l label | --all)]) [--for=delete|--for condition=available]",
Use: "wait ([-f FILENAME] | resource.group/resource.name | resource.group [(-l label | --all)]) [--for=delete|--for condition=available|--for=jsonpath='{}'=value]",
Short: i18n.T("Experimental: Wait for a specific condition on one or many resources"),
Long: waitLong,
Example: waitExample,
Expand All @@ -136,7 +142,7 @@ func (flags *WaitFlags) AddFlags(cmd *cobra.Command) {
flags.ResourceBuilderFlags.AddFlags(cmd.Flags())

cmd.Flags().DurationVar(&flags.Timeout, "timeout", flags.Timeout, "The length of time to wait before giving up. Zero means check once and don't wait, negative means wait for a week.")
cmd.Flags().StringVar(&flags.ForCondition, "for", flags.ForCondition, "The condition to wait on: [delete|condition=condition-name]. The default status value of condition-name is true, you can set false with condition=condition-name=false")
cmd.Flags().StringVar(&flags.ForCondition, "for", flags.ForCondition, "The condition to wait on: [delete|condition=condition-name|jsonpath='{JSONPath expression}'=JSONPath Condition]. The default status value of condition-name is true, you can set false with condition=condition-name=false.")
}

// ToOptions converts from CLI inputs to runtime inputs
Expand Down Expand Up @@ -196,10 +202,55 @@ func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error)
errOut: errOut,
}.IsConditionMet, nil
}
if strings.HasPrefix(condition, "jsonpath=") {
splitStr := strings.Split(condition, "=")
if len(splitStr) != 3 {
return nil, fmt.Errorf("jsonpath wait format must be --for=jsonpath='{.status.readyReplicas}'=3")
}
jsonPathExp, jsonPathCond, err := processJSONPathInput(splitStr[1], splitStr[2])
if err != nil {
return nil, err
}
j, err := newJSONPathParser(jsonPathExp)
if err != nil {
return nil, err
}
return JSONPathWait{
jsonPathCondition: jsonPathCond,
jsonPathParser: j,
errOut: errOut,
}.IsJSONPathConditionMet, nil
}

return nil, fmt.Errorf("unrecognized condition: %q", condition)
}

// newJSONPathParser will create a new JSONPath parser based on the jsonPathExpression
func newJSONPathParser(jsonPathExpression string) (*jsonpath.JSONPath, error) {
j := jsonpath.New("wait")
if jsonPathExpression == "" {
return nil, errors.New("jsonpath expression cannot be empty")
}
if err := j.Parse(jsonPathExpression); err != nil {
return nil, err
}
return j, nil
}

// processJSONPathInput will parses the user's JSONPath input and process the string
func processJSONPathInput(jsonPathExpression, jsonPathCond string) (string, string, error) {
relaxedJSONPathExp, err := cmdget.RelaxedJSONPathExpression(jsonPathExpression)
if err != nil {
return "", "", err
}
if jsonPathCond == "" {
return "", "", errors.New("jsonpath wait condition cannot be empty")
}
jsonPathCond = strings.Trim(jsonPathCond, `'"`)

return relaxedJSONPathExp, jsonPathCond, nil
}

// ResourceLocation holds the location of a resource
type ResourceLocation struct {
GroupResource schema.GroupResource
Expand Down Expand Up @@ -353,16 +404,12 @@ func (w Wait) IsDeleted(event watch.Event) (bool, error) {
}
}

// ConditionalWait hold information to check an API status condition
type ConditionalWait struct {
conditionName string
conditionStatus string
// errOut is written to if an error occurs
errOut io.Writer
}
type isCondMetFunc func(event watch.Event) (bool, error)
type checkCondFunc func(obj *unstructured.Unstructured) (bool, error)

// IsConditionMet is a conditionfunc for waiting on an API condition to be met
func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
// getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
// If the condition is not met, it will make a Watch query to the server and pass in the condMet function
func getObjAndCheckCondition(info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
endTime := time.Now().Add(o.Timeout)
for {
if len(info.Name) == 0 {
Expand All @@ -383,7 +430,7 @@ func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (ru
resourceVersion = gottenObjList.GetResourceVersion()
default:
gottenObj = &gottenObjList.Items[0]
conditionMet, err := w.checkCondition(gottenObj)
conditionMet, err := check(gottenObj)
if conditionMet {
return gottenObj, true, nil
}
Expand All @@ -409,7 +456,7 @@ func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (ru
}

ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, w.isConditionMet)
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, watchtools.ConditionFunc(condMet))
cancel()
switch {
case err == nil:
Expand All @@ -427,6 +474,19 @@ func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (ru
}
}

// ConditionalWait hold information to check an API status condition
type ConditionalWait struct {
conditionName string
conditionStatus string
// errOut is written to if an error occurs
errOut io.Writer
}

// IsConditionMet is a conditionfunc for waiting on an API condition to be met
func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
return getObjAndCheckCondition(info, o, w.isConditionMet, w.checkCondition)
}

func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
if err != nil {
Expand Down Expand Up @@ -486,3 +546,86 @@ func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]
statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration")
return statusObservedGeneration, found
}

// JSONPathWait holds a JSONPath Parser which has the ability
// to check for the JSONPath condition and compare with the API server provided JSON output.
type JSONPathWait struct {
jsonPathCondition string
jsonPathParser *jsonpath.JSONPath
// errOut is written to if an error occurs
errOut io.Writer
}

// IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check
func (j JSONPathWait) IsJSONPathConditionMet(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
return getObjAndCheckCondition(info, o, j.isJSONPathConditionMet, j.checkCondition)
}

// isJSONPathConditionMet is a helper function of IsJSONPathConditionMet
// which check the watch event and check if a JSONPathWait condition is met
func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) {
if event.Type == watch.Error {
// keep waiting in the event we see an error - we expect the watch to be closed by
// the server
err := apierrors.FromObject(event.Object)
fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
return false, nil
}
if event.Type == watch.Deleted {
// this will chain back out, result in another get and an return false back up the chain
return false, nil
}
// event runtime Object can be safely asserted to Unstructed
// because we are working with dynamic client
obj := event.Object.(*unstructured.Unstructured)
lauchokyip marked this conversation as resolved.
Show resolved Hide resolved
return j.checkCondition(obj)
}

// checkCondition uses JSONPath parser to parse the JSON received from the API server
// and check if it matches the desired condition
func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
queryObj := obj.UnstructuredContent()
parseResults, err := j.jsonPathParser.FindResults(queryObj)
if err != nil {
return false, err
}
if err := verifyParsedJSONPath(parseResults); err != nil {
return false, err
}
isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathCondition)
lauchokyip marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, err
}
return isConditionMet, nil
}

// verifyParsedJSONPath verifies the JSON received from the API server is valid.
// It will only accept a single JSON
func verifyParsedJSONPath(results [][]reflect.Value) error {
if len(results) == 0 {
return errors.New("given jsonpath expression does not match any value")
}
if len(results) > 1 {
return errors.New("given jsonpath expression matches more than one list")
}
if len(results[0]) > 1 {
return errors.New("given jsonpath expression matches more than one value")
lauchokyip marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

// compareResults will compare the reflect.Value from the result parsed by the
// JSONPath parser with the expected value given by the value
//
// Since this is coming from an unstructured this can only ever be a primitive,
// map[string]interface{}, or []interface{}.
// We do not support the last two and rely on fmt to handle conversion to string
// and compare the result with user input
func compareResults(r reflect.Value, expectedVal string) (bool, error) {
lauchokyip marked this conversation as resolved.
Show resolved Hide resolved
switch r.Interface().(type) {
case map[string]interface{}, []interface{}:
return false, errors.New("jsonpath leads to a nested object or list which is not supported")
}
s := fmt.Sprintf("%v", r.Interface())
return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil
}