Skip to content

Commit

Permalink
Misc todo cleanup in fluxv2 plugin (#3030)
Browse files Browse the repository at this point in the history
* step 1

* step 2

* step 3

* step 4

* Update cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go

Co-authored-by: Michael Nelson <absoludity@gmail.com>

* step 5

* step 6

Co-authored-by: Michael Nelson <absoludity@gmail.com>
  • Loading branch information
gfichtenholt and absoludity committed Jun 23, 2021
1 parent 0240c96 commit 4547597
Show file tree
Hide file tree
Showing 11 changed files with 671 additions and 365 deletions.
3 changes: 2 additions & 1 deletion cmd/asset-syncer/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/kubeapps/common/datastore"
"github.com/kubeapps/kubeapps/pkg/chart/models"
httpclient "github.com/kubeapps/kubeapps/pkg/http-client"
"github.com/kubeapps/kubeapps/pkg/kube"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -55,7 +56,7 @@ var syncCmd = &cobra.Command{
}
defer manager.Close()

netClient, err := initNetClient(additionalCAFile, tlsInsecureSkipVerify)
netClient, err := httpclient.NewWithCertFile(additionalCAFile, tlsInsecureSkipVerify)
if err != nil {
logrus.Fatal(err)
}
Expand Down
1 change: 0 additions & 1 deletion cmd/asset-syncer/testdata/empty-repo-index.yaml

This file was deleted.

41 changes: 2 additions & 39 deletions cmd/asset-syncer/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@ import (
"bytes"
"compress/gzip"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"image"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"sort"
"strings"
Expand All @@ -56,9 +53,8 @@ import (
)

const (
defaultTimeoutSeconds = 10
additionalCAFile = "/usr/local/share/ca-certificates/ca.crt"
numWorkers = 10
additionalCAFile = "/usr/local/share/ca-certificates/ca.crt"
numWorkers = 10
)

type importChartFilesJob struct {
Expand Down Expand Up @@ -713,39 +709,6 @@ func chartTarballURL(r *models.RepoInternal, cv models.ChartVersion) string {
return source
}

func initNetClient(additionalCA string, skipTLS bool) (*http.Client, error) {
// Get the SystemCertPool, continue with an empty pool on error
caCertPool, _ := x509.SystemCertPool()
if caCertPool == nil {
caCertPool = x509.NewCertPool()
}

// If additionalCA exists, load it
if _, err := os.Stat(additionalCA); !os.IsNotExist(err) {
certs, err := ioutil.ReadFile(additionalCA)
if err != nil {
return nil, fmt.Errorf("Failed to append %s to RootCAs: %v", additionalCA, err)
}

// Append our cert to the system pool
if ok := caCertPool.AppendCertsFromPEM(certs); !ok {
return nil, fmt.Errorf("Failed to append %s to RootCAs", additionalCA)
}
}

// Return Transport for testing purposes
return &http.Client{
Timeout: time.Second * defaultTimeoutSeconds,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: skipTLS,
RootCAs: caCertPool,
},
Proxy: http.ProxyFromEnvironment,
},
}, nil
}

type fileImporter struct {
manager assetManager
netClient httpclient.Client
Expand Down
14 changes: 2 additions & 12 deletions cmd/asset-syncer/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/kubeapps/kubeapps/pkg/helm"
helmfake "github.com/kubeapps/kubeapps/pkg/helm/fake"
helmtest "github.com/kubeapps/kubeapps/pkg/helm/test"
httpclient "github.com/kubeapps/kubeapps/pkg/http-client"
tartest "github.com/kubeapps/kubeapps/pkg/tarutil/test"
log "github.com/sirupsen/logrus"
"k8s.io/helm/pkg/proto/hapi/chart"
Expand Down Expand Up @@ -337,23 +338,12 @@ h251U/Daz6NiQBM9AxyAw6EHm8XAZBvCuebfzyrT
t.Error(err)
}

_, err = initNetClient(otherCA, false)
_, err = httpclient.NewWithCertFile(otherCA, false)
if err != nil {
t.Error(err)
}
}

var emptyRepoIndexYAMLBytes, _ = ioutil.ReadFile("testdata/empty-repo-index.yaml")
var emptyRepoIndexYAML = string(emptyRepoIndexYAMLBytes)

type emptyChartRepoHTTPClient struct{}

func (h *emptyChartRepoHTTPClient) Do(req *http.Request) (*http.Response, error) {
w := httptest.NewRecorder()
w.Write([]byte(emptyRepoIndexYAML))
return w.Result(), nil
}

func Test_getSha256(t *testing.T) {
sha, err := getSha256([]byte("this is a test"))
assert.Equal(t, err, nil, "Unable to get sha")
Expand Down
166 changes: 166 additions & 0 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/conditions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright © 2021 VMware
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main

import (
"fmt"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
log "k8s.io/klog/v2"
)

func isRepoReady(obj map[string]interface{}) (bool, error) {
// see docs at https://fluxcd.io/docs/components/source/helmrepositories/
// Confirm the state we are observing is for the current generation
observedGeneration, found, err := unstructured.NestedInt64(obj, "status", "observedGeneration")
if err != nil {
return false, err
} else if !found {
return false, nil
}
generation, found, err := unstructured.NestedInt64(obj, "metadata", "generation")
if err != nil {
return false, err
} else if !found {
return false, nil
}
if generation != observedGeneration {
return false, nil
}

conditions, found, err := unstructured.NestedSlice(obj, "status", "conditions")
if err != nil {
return false, err
} else if !found {
return false, nil
}

for _, conditionUnstructured := range conditions {
if conditionAsMap, ok := conditionUnstructured.(map[string]interface{}); ok {
if typeString, ok := conditionAsMap["type"]; ok && typeString == "Ready" {
if statusString, ok := conditionAsMap["status"]; ok {
if statusString == "True" {
// note that the current doc on https://fluxcd.io/docs/components/source/helmrepositories/
// incorrectly states the example status reason as "IndexationSucceeded".
// The actual string is "IndexationSucceed"
if reasonString, ok := conditionAsMap["reason"]; !ok || reasonString != "IndexationSucceed" {
// should not happen
log.Infof("Unexpected status of HelmRepository: %v", obj)
}
return true, nil
} else if statusString == "False" {
var msg string
if msg, ok = conditionAsMap["message"].(string); !ok {
msg = fmt.Sprintf("No message available in condition: %v", conditionAsMap)
}
return false, status.Errorf(codes.Internal, msg)
}
}
}
}
}
return false, nil
}

// the goal of this fn is to answer whether or not to stop waiting for chart reconciliation
// which is different from answering whether the chart was pulled successfully
// TODO (gfichtenholt): As above, hopefully this fn isn't required if we can only list charts that we know are ready.
func isChartPullComplete(unstructuredChart *unstructured.Unstructured) (bool, error) {
// see docs at https://fluxcd.io/docs/components/source/helmcharts/
// Confirm the state we are observing is for the current generation
observedGeneration, found, err := unstructured.NestedInt64(unstructuredChart.Object, "status", "observedGeneration")
if err != nil {
return false, err
} else if !found {
return false, nil
}
generation, found, err := unstructured.NestedInt64(unstructuredChart.Object, "metadata", "generation")
if err != nil {
return false, err
} else if !found {
return false, nil
}
if generation != observedGeneration {
return false, nil
}

conditions, found, err := unstructured.NestedSlice(unstructuredChart.Object, "status", "conditions")
if err != nil {
return false, err
} else if !found {
return false, nil
}

// check if ready=True
for _, conditionUnstructured := range conditions {
if conditionAsMap, ok := conditionUnstructured.(map[string]interface{}); ok {
if typeString, ok := conditionAsMap["type"]; ok && typeString == "Ready" {
if statusString, ok := conditionAsMap["status"]; ok {
if statusString == "True" {
if reasonString, ok := conditionAsMap["reason"]; !ok || reasonString != "ChartPullSucceeded" {
// should not happen
log.Infof("unexpected status of HelmChart: %v", *unstructuredChart)
}
return true, nil
} else if statusString == "False" {
var msg string
if msg, ok = conditionAsMap["message"].(string); !ok {
msg = fmt.Sprintf("No message available in condition: %v", conditionAsMap)
}
// chart pull is done and it's a failure
return true, status.Errorf(codes.Internal, msg)
}
}
}
}
}
return false, nil
}

// TODO (gfichtenholt):
// see https://github.com/kubeapps/kubeapps/pull/2915 for context
// In the future you might instead want to consider something like
// passing a results channel (of string urls) to pullChartTarball, so it returns
// immediately and you wait on the results channel at the call-site, which would mean
// you could call it for 20 different charts and just wait for the results to come in
// whatever order they happen to take, rather than serially.
func waitUntilChartPullComplete(watcher watch.Interface) (*string, error) {
ch := watcher.ResultChan()
// LISTEN TO CHANNEL
for {
event := <-ch
if event.Type == watch.Modified {
unstructuredChart, ok := event.Object.(*unstructured.Unstructured)
if !ok {
return nil, status.Errorf(codes.Internal, "Could not cast to unstructured.Unstructured")
}

done, err := isChartPullComplete(unstructuredChart)
if err != nil {
return nil, err
} else if done {
url, found, err := unstructured.NestedString(unstructuredChart.Object, "status", "url")
if err != nil || !found {
return nil, status.Errorf(codes.Internal, "expected field status.url not found on HelmChart: %v:\n%v", err, unstructuredChart)
}
return &url, nil
}
} else {
// TODO handle other kinds of events
return nil, status.Errorf(codes.Internal, "got unexpected event: %v", event)
}
}
}

0 comments on commit 4547597

Please sign in to comment.