Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

Commit

Permalink
Merge 5d31fb4 into befbc54
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanl committed Jun 25, 2018
2 parents befbc54 + 5d31fb4 commit 0cb3ca9
Show file tree
Hide file tree
Showing 32 changed files with 2,088 additions and 151 deletions.
15 changes: 15 additions & 0 deletions pkg/app/mocks/App.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions pkg/clicmd/apply.go
Expand Up @@ -16,8 +16,6 @@
package clicmd

import (
"fmt"

"github.com/ksonnet/ksonnet/pkg/actions"
"github.com/ksonnet/ksonnet/pkg/app"
"github.com/ksonnet/ksonnet/pkg/client"
Expand Down Expand Up @@ -107,7 +105,6 @@ func newApplyCmd(a app.App) *cobra.Command {
actions.OptionSkipGc: viper.GetBool(vApplySkipGc),
}

fmt.Println("extract jsonnet flag")
if err := extractJsonnetFlags(a, "apply"); err != nil {
return errors.Wrap(err, "handle jsonnet flags")
}
Expand Down
210 changes: 86 additions & 124 deletions pkg/cluster/apply.go
Expand Up @@ -16,12 +16,9 @@
package cluster

import (
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/json"
"fmt"
"sort"
"time"

"github.com/ksonnet/ksonnet/pkg/app"
"github.com/ksonnet/ksonnet/pkg/client"
Expand All @@ -34,58 +31,24 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kdiff "k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
)

const (
appKsonnet = "ksonnet"
)

type managedMetadata struct {
Pristine string `json:"pristine,omitempty"`
}

func (mm *managedMetadata) EncodePristine(m map[string]interface{}) error {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if err := json.NewEncoder(gz).Encode(m); err != nil {
return err
}
if err := gz.Flush(); err != nil {
return err
}
if err := gz.Close(); err != nil {
return err
}

mm.Pristine = base64.StdEncoding.EncodeToString(buf.Bytes())
return nil
}

func (mm *managedMetadata) DecodePristine() (map[string]interface{}, error) {
b, err := base64.StdEncoding.DecodeString(mm.Pristine)
if err != nil {
return nil, err
}

r := bytes.NewReader(b)
// applyConflictRetryCount sets how many times an apply is retried before giving up
// after a conflict error is detected.
applyConflictRetryCount = 5

zr, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
defer zr.Close()
// defaultConflictTimeout sets the wait time before retrying after a conflict is detected.
defaultConflictTimeout = 1 * time.Second

var m map[string]interface{}
if err := json.NewDecoder(zr).Decode(&m); err != nil {
return nil, err
}
appKsonnet = "ksonnet"
)

return m, nil
}
var (
errApplyConflict = errors.Errorf("apply conflict detected; retried %d times", applyConflictRetryCount)
)

// ApplyConfig is configuration for Apply.
type ApplyConfig struct {
Expand All @@ -109,24 +72,54 @@ type Apply struct {
// these make it easier to test Apply.
findObjectsFn findObjectsFn
resourceClientFactory resourceClientFactoryFn
genClientOptsFn genClientOptsFn
clientOpts *clientOpts
objectInfo ObjectInfo
ksonnetObjectFactory func() ksonnetObject
upserterFactory func() Upserter
conflictTimeout time.Duration
}

// RunApply runs apply against a cluster given a configuration.
func RunApply(config ApplyConfig, opts ...ApplyOpts) error {
if config.ClientConfig == nil {
return errors.New("ksonnet client config is required")
}

a := &Apply{
ApplyConfig: config,
findObjectsFn: findObjects,
resourceClientFactory: resourceClientFactory,
genClientOptsFn: genClientOpts,
objectInfo: &objectInfo{},
ksonnetObjectFactory: func() ksonnetObject {
factory := cmdutil.NewFactory(config.ClientConfig.Config)
return newDefaultKsonnetObject(factory)
},
conflictTimeout: 1 * time.Second,
}

for _, opt := range opts {
opt(a)
}

if a.clientOpts == nil {
co, err := genClientOpts(a.App, a.ClientConfig, a.EnvName)
if err != nil {
return err
}

a.clientOpts = &co
}

if a.upserterFactory == nil {
u, err := newDefaultUpserter(a.ApplyConfig, a.objectInfo, *a.clientOpts, a.resourceClientFactory)
if err != nil {
return errors.Wrap(err, "creating upserter")
}
a.upserterFactory = func() Upserter {
return u
}
}

return a.Apply()
}

Expand All @@ -141,14 +134,9 @@ func (a *Apply) Apply() error {

seenUids := sets.NewString()

co, err := a.genClientOptsFn(a.App, a.ClientConfig, a.EnvName)
if err != nil {
return err
}

for _, obj := range apiObjects {
var uid string
uid, err = a.handleObject(co, obj)
uid, err = a.handleObject(obj)
if err != nil {
return errors.Wrap(err, "handle object")
}
Expand All @@ -162,103 +150,77 @@ func (a *Apply) Apply() error {
}

if a.GcTag != "" && !a.SkipGc {
if err = a.runGc(co, seenUids); err != nil {
if err = a.runGc(seenUids); err != nil {
return errors.Wrap(err, "run gc")
}
}

return nil
}

func (a *Apply) handleObject(co clientOpts, obj *unstructured.Unstructured) (string, error) {
if err := tagManaged(obj); err != nil {
return "", errors.Wrap(err, "tagging ksonnet managed object")
func (a *Apply) handleObject(obj *unstructured.Unstructured) (string, error) {
if err := a.preprocessObject(obj); err != nil {
return "", errors.Wrap(err, "preprocessing object before apply")
}

factory := cmdutil.NewFactory(a.ClientConfig.Config)
m := newObjectMerger(factory)
mergedObject, err := m.merge(co.namespace, obj)
mergedObject, err := a.patchFromCluster(obj)
if err != nil {
cause := errors.Cause(err)
if !kerrors.IsNotFound(cause) {
return "", errors.Wrap(cause, "merging object with existing state")
}
mergedObject = obj
}

if a.GcTag != "" {
SetMetaDataAnnotation(mergedObject, metadata.AnnotationGcTag, a.GcTag)
return "", errors.Wrap(err, "patching object from cluster")
}

desc := fmt.Sprintf("%s %s", a.objectInfo.ResourceName(co.discovery, mergedObject), utils.FqName(obj))
log.Info("Updating ", desc, a.dryRunText())
a.setupGC(mergedObject)

rc, err := a.resourceClientFactory(co, mergedObject)
if err != nil {
return "", err
}
return a.upsert(mergedObject)
}

asPatch, err := json.Marshal(mergedObject)
if err != nil {
return "", err
}
// preprocessObject preprocesses an object for it is applied to the cluster.
func (a *Apply) preprocessObject(obj *unstructured.Unstructured) error {
dm := newDefaultManaged()
return errors.Wrap(dm.Tag(obj), "tagging ksonnet managed object")
}

var newobj metav1.Object
if !a.DryRun {
newobj, err = rc.Patch(types.MergePatchType, asPatch)
log.Debugf("Patch(%s) returned (%v, %v)", obj.GetName(), newobj, err)
} else {
newobj, err = rc.Get(metav1.GetOptions{})
}
// patchFromCluster patches an object with values that may exist in the cluster.
func (a *Apply) patchFromCluster(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
return a.ksonnetObjectFactory().MergeFromCluster(*a.clientOpts, obj)
}

if a.Create && kerrors.IsNotFound(err) {
log.Info(" Creating non-existent ", desc, a.dryRunText())
if !a.DryRun {
newobj, err = rc.Create()
log.Debugf("Create(%s) returned (%v, %v)", obj.GetName(), newobj, err)
} else {
newobj = mergedObject
err = nil
}
}
if err != nil {
// TODO: retry
return "", errors.Wrapf(err, "can't update %s", desc)
}
func (a *Apply) upsert(obj *unstructured.Unstructured) (string, error) {
u := a.upserterFactory()

log.Debug("Updated object: ", kdiff.ObjectDiff(obj, newobj))
for i := applyConflictRetryCount; i > 0; i-- {
uid, err := u.Upsert(obj)
if err != nil {
cause := errors.Cause(err)
if !kerrors.IsConflict(cause) {
return "", err
}

return string(newobj.GetUID()), nil
}
time.Sleep(a.conflictTimeout)
continue
}

func tagManaged(obj *unstructured.Unstructured) error {
if obj == nil {
return errors.New("object is nil")
return uid, nil
}

mm := &managedMetadata{}
if err := mm.EncodePristine(obj.Object); err != nil {
return err
}
return "", errApplyConflict
}

mmEncoded, err := json.Marshal(mm)
if err != nil {
return err
// setupGC setups ksonnet's garbage collection process for objects.
func (a *Apply) setupGC(obj *unstructured.Unstructured) {
if a.GcTag != "" {
SetMetaDataAnnotation(obj, metadata.AnnotationGcTag, a.GcTag)
}

SetMetaDataLabel(obj, metadata.LabelDeployManager, appKsonnet)
SetMetaDataAnnotation(obj, metadata.AnnotationManaged, string(mmEncoded))

return nil
}

func (a *Apply) runGc(co clientOpts, seenUids sets.String) error {
func (a *Apply) runGc(seenUids sets.String) error {
co := a.clientOpts

version, err := utils.FetchVersion(co.discovery)
if err != nil {
return err
}

err = walkObjects(co, metav1.ListOptions{}, func(o runtime.Object) error {
err = walkObjects(*co, metav1.ListOptions{}, func(o runtime.Object) error {
var metav1Object metav1.Object
metav1Object, err = meta.Accessor(o)
if err != nil {
Expand All @@ -271,7 +233,7 @@ func (a *Apply) runGc(co clientOpts, seenUids sets.String) error {
if eligibleForGc(metav1Object, a.GcTag) && !seenUids.Has(string(metav1Object.GetUID())) {
log.Info("Garbage collecting ", desc, a.dryRunText())
if !a.DryRun {
err = gcDelete(co, a.resourceClientFactory, &version, o)
err = gcDelete(*co, a.resourceClientFactory, &version, o)
if err != nil {
return err
}
Expand Down

0 comments on commit 0cb3ca9

Please sign in to comment.