Skip to content

Commit

Permalink
Further fluxv2 plugin features: misc fixes and add an integration tes…
Browse files Browse the repository at this point in the history
…t for CreateInstalledPackage (#3390)

* introduce a new kind of test: one that runs on a local kind cluster

* step 2

* step 3: TestKindClusterCreateInstalledPackage passing

* step 4

* step 5

* step 6: get rid of dependency on flux CLI

* step 7: get rid of dependency on kubectl CLI

* step 8: added back unit tests for CreateInstalledPackage

* step 9

* step 10: support values

* step 11: eliminate dependency on github.io in integration test (1 of 2)

* Michael's feedback #1

* Michael's feedback #2
  • Loading branch information
gfichtenholt committed Sep 13, 2021
1 parent 55dacdc commit 0e9ee36
Show file tree
Hide file tree
Showing 11 changed files with 1,366 additions and 147 deletions.
8 changes: 6 additions & 2 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,15 @@ func (c NamespacedResourceWatcherCache) resync() (string, error) {
}

// TODO: (gfichtenholt) RBAC check whether I list and watch specified GVR?
// Currently you'll need to run with the unsafe dev-only service account, since the plugin sets
// up background jobs that are running outside of requests from the user (ie. we're not using the
// users' token for those). Longer term, the plan is to create a separate RBAC yaml specific to
// the plugin that will need to be applied for using the plugin (nice and explicit),
// granting additional RBAC privs to the service account used by kubeapps-apis

// this will list resources from all namespaces.
// Notice, we are not setting resourceVersion in ListOptions, which means
// per https://kubernetes.io/docs/reference/using-api/api-concepts/
// For get and list, the semantics of resource version unset are to get the most recent
// For Get() and List(), the semantics of resource version unset are to get the most recent
// version
listItems, err := dynamicClient.Resource(c.config.gvr).Namespace(apiv1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
Expand Down
24 changes: 11 additions & 13 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,17 +192,15 @@ func (s *Server) fetchChartFromCache(repo types.NamespacedName, chartName string
return nil, nil
}

// the main goal of this func is to answer whether or not to stop waiting for chart reconciliation
// which is different from answering whether the chart was pulled successfully
// TODO (gfichtenholt): As above, hopefully this func isn't required if we can only list charts that we know are ready.
func isChartPullComplete(unstructuredChart map[string]interface{}) (complete, success bool, reason string) {
// see docs at https://fluxcd.io/docs/components/source/helmcharts/
// Confirm the state we are observing is for the current generation
if !checkGeneration(unstructuredChart) {
return false, false, ""
} else {
return checkStatusReady(unstructuredChart)
}
// returns 3 things:
// - complete whether the operation was completed
// - success (only applicable when complete == true) whether the operation was successful or failed
// - reason, if present
// docs:
// 1. https://fluxcd.io/docs/components/source/helmcharts/#status-examples
func isHelmChartReady(unstructuredObj map[string]interface{}) (complete bool, success bool, reason string) {
// same format and logic, so just re-use the code
return isHelmRepositoryReady(unstructuredObj)
}

// TODO (gfichtenholt):
Expand Down Expand Up @@ -235,7 +233,7 @@ func waitUntilChartPullComplete(ctx context.Context, watcher watch.Interface) (s
return "", status.Errorf(codes.Internal, "could not cast to unstructured.Unstructured")
}

done, success, reason := isChartPullComplete(unstructuredChart.Object)
done, success, reason := isHelmChartReady(unstructuredChart.Object)
if done {
if success {
url, found, err := unstructured.NestedString(unstructuredChart.Object, "status", "url")
Expand Down Expand Up @@ -300,7 +298,7 @@ func findUrlForChartInList(chartList *unstructured.UnstructuredList, repoName, c
thisRepoName, found2, err2 := unstructured.NestedString(unstructuredChart.Object, "spec", "sourceRef", "name")

if err == nil && err2 == nil && found && found2 && repoName == thisRepoName && chartName == thisChartName {
if done, success, reason := isChartPullComplete(unstructuredChart.Object); done {
if done, success, reason := isHelmChartReady(unstructuredChart.Object); done {
if success {
if url, found, err := unstructured.NestedString(unstructuredChart.Object, "status", "url"); err != nil || !found {
return "", status.Errorf(codes.Internal, "expected field status.url not found on HelmChart: %v:\n%v", err, unstructuredChart)
Expand Down
168 changes: 129 additions & 39 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ import (
"helm.sh/helm/v3/pkg/release"
"helm.sh/helm/v3/pkg/storage/driver"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
log "k8s.io/klog/v2"
"sigs.k8s.io/yaml"
)

const (
Expand Down Expand Up @@ -212,16 +215,19 @@ func (s *Server) installedPackageDetail(ctx context.Context, name types.Namespac
return nil, status.Errorf(codes.NotFound, "Unable to find Helm release %q due to: %v", name, err)
}

log.V(4).Infof("installedPackageDetail:\n[%s]", prettyPrintMap(unstructuredRelease.Object))

obj := unstructuredRelease.Object
var pkgVersionRef *corev1.VersionReference
version, found, err := unstructured.NestedString(unstructuredRelease.Object, "spec", "chart", "spec", "version")
version, found, err := unstructured.NestedString(obj, "spec", "chart", "spec", "version")
if found && err == nil && version != "" {
pkgVersionRef = &corev1.VersionReference{
Version: version,
}
}

valuesApplied := ""
valuesMap, found, err := unstructured.NestedMap(unstructuredRelease.Object, "spec", "values")
valuesMap, found, err := unstructured.NestedMap(obj, "spec", "values")
if found && err == nil && len(valuesMap) != 0 {
bytes, err := json.Marshal(valuesMap)
if err != nil {
Expand All @@ -233,30 +239,31 @@ func (s *Server) installedPackageDetail(ctx context.Context, name types.Namespac
// ValuesReference maybe a config map or a secret

// this will only be present if install/upgrade succeeded
pkgVersion, found, err := unstructured.NestedString(unstructuredRelease.Object, "status", "lastAppliedRevision")
pkgVersion, found, err := unstructured.NestedString(obj, "status", "lastAppliedRevision")
if !found || err != nil || pkgVersion == "" {
// this is the back-up option: will be there if the reconciliation is in progress or has failed
pkgVersion, _, _ = unstructured.NestedString(unstructuredRelease.Object, "status", "lastAttemptedRevision")
pkgVersion, _, _ = unstructured.NestedString(obj, "status", "lastAttemptedRevision")
}

availablePackageRef, err := installedPackageAvailablePackageRefFromUnstructured(unstructuredRelease.Object)
availablePackageRef, err := installedPackageAvailablePackageRefFromUnstructured(obj)
if err != nil {
return nil, err
}

release, err := s.helmReleaseFromUnstructured(ctx, name, unstructuredRelease.Object)
if err != nil {
return nil, err
}

// a couple of fields only available via helm API
appVersion := ""
postInstallNotes := ""
if release != nil {
appVersion = release.Chart.AppVersion()
appVersion, postInstallNotes := "", ""
release, err := s.helmReleaseFromUnstructured(ctx, name, obj)
// err maybe NotFound if this object has just been created and flux hasn't had time
// to invoke helm layer yet
if err == nil && release != nil {
// a couple of fields currrently only available via helm API
if release.Chart != nil {
appVersion = release.Chart.AppVersion()
}
if release.Info != nil {
postInstallNotes = release.Info.Notes
}
} else if err != nil && !errors.IsNotFound(err) {
log.Warningf("Failed to get helm release due to %v", err)
}

return &corev1.InstalledPackageDetail{
Expand All @@ -274,10 +281,10 @@ func (s *Server) installedPackageDetail(ctx context.Context, name types.Namespac
AppVersion: appVersion,
},
ValuesApplied: valuesApplied,
ReconciliationOptions: installedPackageReconciliationOptionsFromUnstructured(unstructuredRelease.Object),
ReconciliationOptions: installedPackageReconciliationOptionsFromUnstructured(obj),
AvailablePackageRef: availablePackageRef,
PostInstallationNotes: postInstallNotes,
Status: installedPackageStatusFromUnstructured(unstructuredRelease.Object),
Status: installedPackageStatusFromUnstructured(obj),
}, nil
}

Expand All @@ -302,20 +309,20 @@ func (s *Server) helmReleaseFromUnstructured(ctx context.Context, name types.Nam

actionConfig, err := s.actionConfigGetter(ctx, name.Namespace)
if err != nil || actionConfig == nil {
return nil, status.Errorf(codes.Internal, "Unable to create Helm action config: %v", err)
return nil, status.Errorf(codes.Internal, "Unable to create Helm action config in namespace [%s] due to: %v", name.Namespace, err)
}
cmd := action.NewGet(actionConfig)
release, err := cmd.Run(helmReleaseName)
if err != nil {
if err == driver.ErrReleaseNotFound {
return nil, status.Errorf(codes.NotFound, "Unable to find Helm release [%s]", helmReleaseName)
return nil, status.Errorf(codes.NotFound, "Unable to find Helm release [%s] in namespace [%s]", helmReleaseName, name.Namespace)
}
return nil, status.Errorf(codes.NotFound, "Unable to run Helm Get action for release [%s]: %v", helmReleaseName, err)
return nil, status.Errorf(codes.NotFound, "Unable to run Helm Get action for release [%s] in namespace [%s]: %v", helmReleaseName, name.Namespace, err)
}
return release, nil
}

func (s *Server) newRelease(ctx context.Context, packageRef *corev1.AvailablePackageReference, targetName types.NamespacedName) (*corev1.InstalledPackageReference, error) {
func (s *Server) newRelease(ctx context.Context, packageRef *corev1.AvailablePackageReference, targetName types.NamespacedName, versionRef *corev1.VersionReference, reconcile *corev1.ReconciliationOptions, valuesString string) (*corev1.InstalledPackageReference, error) {
// HACK: just for now assume HelmRelease CRD will live in the kubeapps namespace
kubeappsNamespace := os.Getenv("POD_NAMESPACE")
resourceIfc, err := s.getReleasesResourceInterface(ctx, kubeappsNamespace)
Expand All @@ -340,7 +347,16 @@ func (s *Server) newRelease(ctx context.Context, packageRef *corev1.AvailablePac
return nil, err
}

fluxHelmRelease := newFluxHelmRelease(chart, kubeappsNamespace, targetName)
var values map[string]interface{}
if valuesString != "" {
values = make(map[string]interface{})
err = yaml.Unmarshal([]byte(valuesString), &values)
if err != nil {
return nil, err
}
}

fluxHelmRelease := newFluxHelmRelease(chart, kubeappsNamespace, targetName, versionRef, reconcile, values)
newRelease, err := resourceIfc.Create(ctx, fluxHelmRelease, metav1.CreateOptions{})
if err != nil {
return nil, err
Expand All @@ -353,23 +369,81 @@ func (s *Server) newRelease(ctx context.Context, packageRef *corev1.AvailablePac
return &corev1.InstalledPackageReference{
Context: &corev1.Context{Namespace: name.Namespace},
Identifier: name.Name,
Plugin: GetPluginDetail(),
}, nil
}

// returns 3 things:
// - ready: whether the HelmRelease object is in a ready state
// - reason: one of SUCCESS/FAILURE/PENDING/UNSPECIFIED,
// - userReason: textual description of why the object is in current state, if present
// docs:
// 1. https://fluxcd.io/docs/components/helm/helmreleases/#examples
// 2. discussion on private slack channel. Summary:
// - "ready" field: - it's not indicating that the resource has completed (i.e. whether the task
// completed with install or failure), but rather just whether the resource is ready or not.
// So it can be false because of either a final state (failure) or a pending state
// (reconciliation in progress or whatever). That means the ready flag will only be set to true
// when install completes with success
// - "reason" field: failure only when flux returns "InstallFailed" reason
// otherwise pending or unspecified when there are no status conditions to go by
//
func isHelmReleaseReady(unstructuredObj map[string]interface{}) (ready bool, status corev1.InstalledPackageStatus_StatusReason, userReason string) {
if !checkGeneration(unstructuredObj) {
return false, corev1.InstalledPackageStatus_STATUS_REASON_UNSPECIFIED, ""
}

conditions, found, err := unstructured.NestedSlice(unstructuredObj, "status", "conditions")
if err != nil || !found {
return false, corev1.InstalledPackageStatus_STATUS_REASON_UNSPECIFIED, ""
}

isInstallFailed := false

for _, conditionUnstructured := range conditions {
if conditionAsMap, ok := conditionUnstructured.(map[string]interface{}); ok {
if typeString, ok := conditionAsMap["type"]; ok && typeString == "Ready" {
// this could be something like
// "reason": "InstallFailed"
// i.e. not super-useful
if reasonString, ok := conditionAsMap["reason"]; ok {
userReason = fmt.Sprintf("%v", reasonString)
if reasonString == "InstallFailed" {
isInstallFailed = true
}
}
// whereas this could be something like:
// "message": 'Helm install failed: unable to build kubernetes objects from
// release manifest: error validating "": error validating data:
// ValidationError(Deployment.spec.replicas): invalid type for
// io.k8s.api.apps.v1.DeploymentSpec.replicas: got "string", expected "integer"'
// i.e. a little more useful, so we'll just return them both
if messageString, ok := conditionAsMap["message"]; ok {
userReason += fmt.Sprintf(": %v", messageString)
}
if statusString, ok := conditionAsMap["status"]; ok {
if statusString == "True" {
return true, corev1.InstalledPackageStatus_STATUS_REASON_INSTALLED, userReason
} else if isInstallFailed {
return false, corev1.InstalledPackageStatus_STATUS_REASON_FAILED, userReason
} else {
return false, corev1.InstalledPackageStatus_STATUS_REASON_PENDING, userReason
}
}
}
}
}
// catch all: unless we know something else, install is pending
return false, corev1.InstalledPackageStatus_STATUS_REASON_PENDING, userReason
}

func installedPackageStatusFromUnstructured(unstructuredRelease map[string]interface{}) *corev1.InstalledPackageStatus {
complete, success, reason := checkStatusReady(unstructuredRelease)
status := &corev1.InstalledPackageStatus{
Ready: complete && success,
UserReason: reason,
}
if complete && success {
status.Reason = corev1.InstalledPackageStatus_STATUS_REASON_INSTALLED
} else if complete && !success {
status.Reason = corev1.InstalledPackageStatus_STATUS_REASON_FAILED
} else {
status.Reason = corev1.InstalledPackageStatus_STATUS_REASON_PENDING
ready, reason, userReason := isHelmReleaseReady(unstructuredRelease)
return &corev1.InstalledPackageStatus{
Ready: ready,
Reason: reason,
UserReason: userReason,
}
return status
}

func installedPackageReconciliationOptionsFromUnstructured(unstructuredRelease map[string]interface{}) *corev1.ReconciliationOptions {
Expand Down Expand Up @@ -417,7 +491,7 @@ func installedPackageAvailablePackageRefFromUnstructured(unstructuredRelease map
// 1. spec.chart.spec.sourceRef.namespace, where HelmRepository CRD object referenced exists
// 2. metadata.namespace, where this HelmRelease CRD will exist
// 3. spec.targetNamespace, where flux will install any artifacts from the release
func newFluxHelmRelease(chart *models.Chart, releaseNamespace string, targetName types.NamespacedName) *unstructured.Unstructured {
func newFluxHelmRelease(chart *models.Chart, releaseNamespace string, targetName types.NamespacedName, versionRef *corev1.VersionReference, reconcile *corev1.ReconciliationOptions, values map[string]interface{}) *unstructured.Unstructured {
unstructuredRel := unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": fmt.Sprintf("%s/%s", fluxHelmReleaseGroup, fluxHelmReleaseVersion),
Expand All @@ -429,23 +503,39 @@ func newFluxHelmRelease(chart *models.Chart, releaseNamespace string, targetName
"spec": map[string]interface{}{
"chart": map[string]interface{}{
"spec": map[string]interface{}{
"chart": chart.Name,
"version": "*",
"chart": chart.Name,
"sourceRef": map[string]interface{}{
"name": chart.Repo.Name,
"kind": fluxHelmRepository,
"namespace": chart.Repo.Namespace,
},
},
},
"interval": "1m",
"install": map[string]interface{}{
"createNamespace": true,
},
"targetNamespace": targetName.Namespace,
// TODO: values
},
},
}
if versionRef != nil && versionRef.Version != "" {
unstructured.SetNestedField(unstructuredRel.Object, versionRef.Version, "spec", "chart", "spec", "version")
}
reconcileInterval := "1m" // unless explictly specified
if reconcile != nil {
if reconcile.Interval > 0 {
reconcileInterval = (time.Duration(reconcile.Interval) * time.Second).String()
}
unstructured.SetNestedField(unstructuredRel.Object, reconcile.Suspend, "spec", "suspend")
if reconcile.ServiceAccountName != "" {
unstructured.SetNestedField(unstructuredRel.Object, reconcile.ServiceAccountName, "spec", "serviceAccountName")
}
}
if values != nil {
unstructured.SetNestedMap(unstructuredRel.Object, values, "spec", "values")
}

// required fields, without which flux controller will fail to create the CRD
unstructured.SetNestedField(unstructuredRel.Object, reconcileInterval, "spec", "interval")
return &unstructuredRel
}
Loading

0 comments on commit 0e9ee36

Please sign in to comment.