Skip to content

Commit

Permalink
Flux v2 GetAvailablePackages() implementation (#2852)
Browse files Browse the repository at this point in the history
* #759

Allow adding an optional description for app repositories

* updates based on Michael Nelson's feedback

* missed a test

* incremental

* incremental

* incremental
  • Loading branch information
gfichtenholt committed May 24, 2021
1 parent 7c9fb1d commit 6559bec
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 18 deletions.
8 changes: 6 additions & 2 deletions cmd/kubeapps-apis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,14 @@ IMAGE_TAG=dev1 make kubeapps/kubeapps-apis
and make that image available on your cluster somehow. If using kind, you can simply do:

```bash
kind load docker-image docker.io/kubeapps/kubeapps-apis:dev1 --name kubeapps
kind load docker-image kubeapps/kubeapps-apis:dev1 --name kubeapps
```

When you deploy or upgrade Kubeapps, be sure to include the values file at `docs/developer/manifests/values.kubeappsapis.yaml` which provides the configuration to include the kubeapps-apis deployment and service etc. You can edit that file to change the `kubeappsapis.image.tag` field to match the tag above, or edit the deployment once deployed to match.
When you deploy or upgrade Kubeapps, be sure to include the values file at `docs/developer/manifests/values.kubeappsapis.yaml` which provides the configuration to include the kubeapps-apis deployment and service etc. You can edit that file to change the `kubeappsapis.image.tag` field to match the tag above, or edit the deployment once deployed to match, such as:

```bash
kubectl set image deployment/kubeapps-internal-kubeappsapis -n kubeapps kubeappsapis=kubeapps/kubeapps-apis:dev1 --record
```

With the kubeapps-apis service running, you can then test the packages endpoints in cluster by port-forwarding the service in one terminal:

Expand Down
163 changes: 147 additions & 16 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,40 @@ package main
import (
"context"
"fmt"
"io/ioutil"
"net/http"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"

"github.com/ghodss/yaml"
corev1 "github.com/kubeapps/kubeapps/cmd/kubeapps-apis/gen/core/packages/v1alpha1"
"github.com/kubeapps/kubeapps/cmd/kubeapps-apis/gen/plugins/fluxv2/packages/v1alpha1"
"k8s.io/client-go/rest"
helmrepo "k8s.io/helm/pkg/repo"
log "k8s.io/klog/v2"
)

// Server implements the kapp-controller packages v1alpha1 interface.
const (
// see docs at https://fluxcd.io/docs/components/source/
fluxGroup = "source.toolkit.fluxcd.io"
fluxVersion = "v1beta1"
fluxHelmRepoResource = "helmrepositories"
)

// Server implements the fluxv2 packages v1alpha1 interface.
type Server struct {
v1alpha1.UnimplementedPackagesServiceServer
}

// GetPackageRepositories returns the package repositories based on the request.
func (s *Server) GetPackageRepositories(ctx context.Context, request *corev1.GetPackageRepositoriesRequest) (*corev1.GetPackageRepositoriesResponse, error) {
// TODO: replace incluster config with the user config using token from request meta.
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("unable to create incluster config: %w", err)
}

client, err := dynamic.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("unable to create dynamic client: %w", err)
}

repositoryResource := schema.GroupVersionResource{Group: "source.toolkit.fluxcd.io", Version: "v1beta1", Resource: "helmrepositories"}

// Currently checks globally. Update to handle namespaced requests (?)
repos, err := client.Resource(repositoryResource).List(ctx, metav1.ListOptions{})
repos, err := getHelmRepos(ctx)
if err != nil {
return nil, fmt.Errorf("unable to list helm-fluxv2 repositories: %w", err)
return nil, err
}

responseRepos := []*corev1.PackageRepository{}
Expand Down Expand Up @@ -81,3 +80,135 @@ func (s *Server) GetPackageRepositories(ctx context.Context, request *corev1.Get
Repositories: responseRepos,
}, nil
}

// GetAvailablePackages streams the available packages based on the request.
func (s *Server) GetAvailablePackages(ctx context.Context, request *corev1.GetAvailablePackagesRequest) (*corev1.GetAvailablePackagesResponse, error) {
log.Infof("+GetAvailablePackages(namespace=[%s])", request.Namespace)
repos, err := getHelmRepos(ctx)
if err != nil {
return nil, err
}

responsePackages := []*corev1.AvailablePackage{}
for _, repoUnstructured := range repos.Items {
name, found, err := unstructured.NestedString(repoUnstructured.Object, "metadata", "name")
if err != nil || !found {
log.Errorf("required field metadata.name not found on HelmRepository: %w:\n%v", err, repoUnstructured.Object)
continue
}

// see docs at https://fluxcd.io/docs/components/source/helmrepositories/
conditions, found, err := unstructured.NestedSlice(repoUnstructured.Object, "status", "conditions")
if err != nil || !found {
log.Infof("Skipping packages for repository [%s] because it has not reached 'Ready' state:%w\n%v", name, err, repoUnstructured.Object)
continue
}

ready := false
for _, conditionUnstructured := range conditions {
if conditionAsMap, ok := conditionUnstructured.(map[string]interface{}); ok {
if typeString, ok := conditionAsMap["type"]; ok && typeString == "Ready" {
if statusString, ok := conditionAsMap["status"]; ok && statusString == "True" {
if reasonString, ok := conditionAsMap["reason"]; ok && reasonString == "IndexationSucceed" {
ready = true
break
}
}
}
}
}

if !ready {
log.Infof("Skipping packages for repository [%s] because it is not in Ready state:n%v", name, repoUnstructured.Object)
continue
}

url, found, err := unstructured.NestedString(repoUnstructured.Object, "status", "url")
if err != nil || !found {
log.Infof("expected field status.url not found on HelmRepository: %w:\n%v", err, repoUnstructured.Object)
continue
}

log.Infof("Found repository: [%s], index URL: [%s]", name, url)
repoPackages, err := readPackagesFromRepoIndex(url)
if err != nil {
// just skip this repo
log.Errorf("Failed to read packages for repository [%s] due to %v", name, err)
} else {
responsePackages = append(responsePackages, repoPackages...)
}
}
return &corev1.GetAvailablePackagesResponse{
Packages: responsePackages,
}, nil
}

func getHelmRepos(ctx context.Context) (*unstructured.UnstructuredList, error) {
// TODO: replace incluster config with the user config using token from request meta.
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("unable to create incluster config: %w", err)
}

client, err := dynamic.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("unable to create dynamic client: %w", err)
}

repositoryResource := schema.GroupVersionResource{
Group: fluxGroup,
Version: fluxVersion,
Resource: fluxHelmRepoResource}

// Currently checks globally. Update to handle namespaced requests (?)
repos, err := client.Resource(repositoryResource).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("unable to list fluxv2 helmrepositories: %w", err)
} else {
// TODO: should we filter out those repos that don't have .status.condition.Ready == True?
// like we do in GetAvailablePackages()?
// i.e. should GetAvailableRepos() call semantics be such that only "Ready" repos are returned
return repos, nil
}
}

func readPackagesFromRepoIndex(indexURL string) ([]*corev1.AvailablePackage, error) {
//Get the response bytes from the url
response, err := http.Get(indexURL)
if err != nil {
return nil, err
}
defer response.Body.Close()

if response.StatusCode != 200 {
return nil, fmt.Errorf("received non 200 response code: [%d]", response.StatusCode)
}

contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}

var index helmrepo.IndexFile
err = yaml.Unmarshal(contents, &index)
if err != nil {
return nil, err
}

index.SortEntries()
responsePackages := []*corev1.AvailablePackage{}
for _, entry := range index.Entries {
if entry[0].GetDeprecated() {
log.Infof("skipping deprecated chart: [%s]", entry[0].Name)
continue
}
pkg := &corev1.AvailablePackage{
Name: entry[0].Name,
Version: entry[0].Version,
// TODO icon URL
// TODD repo ref
}
responsePackages = append(responsePackages, pkg)
}
return responsePackages, nil
}

0 comments on commit 6559bec

Please sign in to comment.