Skip to content

Commit

Permalink
Reduce Carvel response times for GetAvailablePackageSummaries (#4378)
Browse files Browse the repository at this point in the history
* Refactor to use slice with correctly paginated data.

Signed-off-by: Michael Nelson <minelson@vmware.com>

* Refactor to fetch all package versions at once.

Signed-off-by: Michael Nelson <minelson@vmware.com>

* Add buffering to channel. No effect for TCE repo.

Signed-off-by: Michael Nelson <minelson@vmware.com>

* Removed assumption of version order for packages.

Signed-off-by: Michael Nelson <minelson@vmware.com>
  • Loading branch information
absoludity committed Mar 7, 2022
1 parent 056926d commit 2769a2c
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 85 deletions.
3 changes: 0 additions & 3 deletions cmd/kubeapps-apis/core/packages/v1alpha1/packages.go
Expand Up @@ -77,10 +77,7 @@ func (s packagesServer) GetAvailablePackageSummaries(ctx context.Context, reques

// TODO: We can do these in parallel in separate go routines.
for _, p := range s.pluginsWithServers {
log.Infof("Items now: %d/%d", len(pkgs), (pageOffset*int(pageSize) + int(pageSize)))
if pageSize == 0 || len(pkgs) <= (pageOffset*int(pageSize)+int(pageSize)) {
log.Infof("Should enter")

response, err := p.server.GetAvailablePackageSummaries(ctx, requestN)
if err != nil {
return nil, status.Errorf(status.Convert(err).Code(), "Invalid GetAvailablePackageSummaries response from the plugin %v: %v", p.plugin.Name, err)
Expand Down
Expand Up @@ -26,6 +26,8 @@ import (
log "k8s.io/klog/v2"
)

const PACKAGES_CHANNEL_BUFFER_SIZE = 20

// GetAvailablePackageSummaries returns the available packages managed by the 'kapp_controller' plugin
func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *corev1.GetAvailablePackageSummariesRequest) (*corev1.GetAvailablePackageSummariesResponse, error) {
// Retrieve parameters from the request
Expand All @@ -45,61 +47,90 @@ func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *core
cluster = s.globalPackagingCluster
}
// fetch all the package metadatas
// TODO(minelson): We should be grabbing only the requested page
// of results here.
pkgMetadatas, err := s.getPkgMetadatas(ctx, cluster, namespace)
if err != nil {
return nil, statuserror.FromK8sError("get", "PackageMetadata", "", err)
}

// Create a channel to receive any results. The channel is also used as a
// natural waitgroup to synchronize the results.
type fetchResult struct {
index int
availablePackageSummary *corev1.AvailablePackageSummary
err error
// Until the above request uses the pagination, update the slice
// to be the correct page of results.
startAt := 0
if pageSize > 0 {
startAt = int(pageSize) * pageOffset
if startAt > len(pkgMetadatas) {
return nil, status.Errorf(codes.InvalidArgument, "invalid pagination arguments %v", request.GetPaginationOptions())
}
pkgMetadatas = pkgMetadatas[startAt:]
if len(pkgMetadatas) > int(pageSize) {
pkgMetadatas = pkgMetadatas[:pageSize]
}
}
fetchResults := make(chan fetchResult)
numFetched := 0

// TODO(agamez): DRY up this logic (cf GetInstalledPackageSummaries)
if len(pkgMetadatas) > 0 {
startAt := 0
if pageSize > 0 {
startAt = int(pageSize) * pageOffset
}
for i, pkgMetadata := range pkgMetadatas {
if startAt <= i {
numFetched++
go func(i int, pkgMetadata *datapackagingv1alpha1.PackageMetadata) {
availablePackageSummary, err := s.fetchPackageSummaryForMeta(ctx, cluster, namespace, pkgMetadata)
// Create a channel to receive all packages available in the namespace.
// Using a buffered channel so that we don't block the network request if we
// can't process fast enough.
getPkgsChannel := make(chan *datapackagingv1alpha1.Package, PACKAGES_CHANNEL_BUFFER_SIZE)
var getPkgsError error
go func() {
getPkgsError = s.getPkgs(ctx, cluster, namespace, getPkgsChannel)
}()

// The index of this result is relative to the page.
fetchResults <- fetchResult{i - startAt, availablePackageSummary, err}
}(i, pkgMetadata)
}
// if we've reached the end of the page, stop iterating
if pageSize > 0 && numFetched == int(pageSize) {
break
}
}
// Skip through the packages until we get to the first item in our
// paginated results.
currentPkg := <-getPkgsChannel
for currentPkg != nil && currentPkg.Spec.RefName != pkgMetadatas[0].Name {
currentPkg = <-getPkgsChannel
}
// Return an error if any is found. We continue only if there were no
// errors.
availablePackageSummaries := make([]*corev1.AvailablePackageSummary, numFetched)

availablePackageSummaries := make([]*corev1.AvailablePackageSummary, len(pkgMetadatas))
categories := []string{}
for i := 0; i < numFetched; i++ {
fetchResult := <-fetchResults
if fetchResult.err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected error while gathering available packages: %v", err))
pkgsForMeta := []*datapackagingv1alpha1.Package{}
for i, pkgMetadata := range pkgMetadatas {
// currentPkg will be nil if the channel is closed and there's no
// more items to consume.
if currentPkg == nil {
return nil, statuserror.FromK8sError("get", "Package", pkgMetadata.Name, fmt.Errorf("no package versions for the package %q", pkgMetadata.Name))
}
// append the availablePackageSummary to the slice
availablePackageSummaries[fetchResult.index] = fetchResult.availablePackageSummary
categories = append(categories, fetchResult.availablePackageSummary.Categories...)
// The kapp-controller returns both packages and package metadata
// in order.
if currentPkg.Spec.RefName != pkgMetadata.Name {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected order for kapp-controller packages, expected %q, found %q", pkgMetadata.Name, currentPkg.Spec.RefName))
}
// Collect the packages for a particular refName to be able to send the
// latest semver version. For the moment, kapp-controller just returns
// CRs with the default alpha sorting of the CR name.
// Ref https://kubernetes.slack.com/archives/CH8KCCKA5/p1646285201181119
pkgsForMeta = append(pkgsForMeta, currentPkg)
currentPkg = <-getPkgsChannel
for currentPkg != nil && currentPkg.Spec.RefName == pkgMetadata.Name {
pkgsForMeta = append(pkgsForMeta, currentPkg)
currentPkg = <-getPkgsChannel
}
// At this point, we have all the packages collected that match
// this ref name, and currentPkg is for the next meta name.
pkgVersionMap, err := getPkgVersionsMap(pkgsForMeta)
if err != nil || len(pkgVersionMap[pkgMetadata.Name]) == 0 {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unable to calculate package versions map for packages: %v, err: %v", pkgsForMeta, err))
}
latestVersion := pkgVersionMap[pkgMetadata.Name][0].version.String()
availablePackageSummary := s.buildAvailablePackageSummary(pkgMetadata, latestVersion, cluster)
availablePackageSummaries[i] = availablePackageSummary
categories = append(categories, availablePackageSummary.Categories...)

// Reset the packages for the current meta name.
pkgsForMeta = pkgsForMeta[:0]
}

// Verify no error during go routine.
if getPkgsError != nil {
return nil, statuserror.FromK8sError("get", "Package", "", err)
}

// Only return a next page token if the request was for pagination and
// the results are a full page.
nextPageToken := ""
if pageSize > 0 && numFetched == int(pageSize) {
if pageSize > 0 && len(availablePackageSummaries) == int(pageSize) {
nextPageToken = fmt.Sprintf("%d", pageOffset+1)
}
response := &corev1.GetAvailablePackageSummariesResponse{
Expand All @@ -110,29 +141,6 @@ func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *core
return response, nil
}

func (s *Server) fetchPackageSummaryForMeta(ctx context.Context, cluster, namespace string, pkgMetadata *datapackagingv1alpha1.PackageMetadata) (*corev1.AvailablePackageSummary, error) {
// fetch the associated packages
// Use the field selector to return only Package CRs that match on the spec.refName.
// TODO(agamez): perhaps we better fetch all the packages and filter ourselves to reduce the k8s calls
fieldSelector := fmt.Sprintf("spec.refName=%s", pkgMetadata.Name)
pkgs, err := s.getPkgsWithFieldSelector(ctx, cluster, namespace, fieldSelector)
if err != nil {
return nil, statuserror.FromK8sError("get", "Package", pkgMetadata.Name, err)
}
pkgVersionsMap, err := getPkgVersionsMap(pkgs)
if err != nil {
return nil, err
}

// generate the availablePackageSummary from the fetched information
availablePackageSummary, err := s.buildAvailablePackageSummary(pkgMetadata, pkgVersionsMap, cluster)
if err != nil {
return nil, statuserror.FromK8sError("create", "AvailablePackageSummary", pkgMetadata.Name, err)
}

return availablePackageSummary, nil
}

// GetAvailablePackageVersions returns the package versions managed by the 'kapp_controller' plugin
func (s *Server) GetAvailablePackageVersions(ctx context.Context, request *corev1.GetAvailablePackageVersionsRequest) (*corev1.GetAvailablePackageVersionsResponse, error) {
// Retrieve parameters from the request
Expand Down
Expand Up @@ -25,15 +25,9 @@ import (
log "k8s.io/klog/v2"
)

func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1.PackageMetadata, pkgVersionsMap map[string][]pkgSemver, cluster string) (*corev1.AvailablePackageSummary, error) {
func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1.PackageMetadata, latestVersion string, cluster string) *corev1.AvailablePackageSummary {
var iconStringBuilder strings.Builder

// get the versions associated with the package
versions := pkgVersionsMap[pkgMetadata.Name]
if len(versions) == 0 {
return nil, fmt.Errorf("no package versions for the package %q", pkgMetadata.Name)
}

// Carvel uses base64-encoded SVG data for IconSVGBase64, whereas we need
// a url, so convert to a data-url.

Expand All @@ -58,16 +52,16 @@ func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1
// Currently, PkgVersion and AppVersion are the same
// https://kubernetes.slack.com/archives/CH8KCCKA5/p1636386358322000?thread_ts=1636371493.320900&cid=CH8KCCKA5
LatestVersion: &corev1.PackageAppVersion{
PkgVersion: versions[0].version.String(),
AppVersion: versions[0].version.String(),
PkgVersion: latestVersion,
AppVersion: latestVersion,
},
IconUrl: iconStringBuilder.String(),
DisplayName: pkgMetadata.Spec.DisplayName,
ShortDescription: pkgMetadata.Spec.ShortDescription,
Categories: pkgMetadata.Spec.Categories,
}

return availablePackageSummary, nil
return availablePackageSummary
}

func (s *Server) buildAvailablePackageDetail(pkgMetadata *datapackagingv1alpha1.PackageMetadata, requestedPkgVersion string, foundPkgSemver *pkgSemver, cluster string) (*corev1.AvailablePackageDetail, error) {
Expand Down
Expand Up @@ -203,9 +203,31 @@ func (s *Server) getApp(ctx context.Context, cluster, namespace, identifier stri

// List of resources getters

// getPkgs returns the list of packages for the given cluster and namespace
func (s *Server) getPkgs(ctx context.Context, cluster, namespace string) ([]*datapackagingv1alpha1.Package, error) {
return s.getPkgsWithFieldSelector(ctx, cluster, namespace, "")
// getPkgs requests the packages for the given cluster and namespace and sends
// them to the channel to be processed immediately, closing the channel
// when finished or when an error is returned.
func (s *Server) getPkgs(ctx context.Context, cluster, namespace string, ch chan<- *datapackagingv1alpha1.Package) error {
defer close(ch)
resource, err := s.getPkgResource(ctx, cluster, namespace)
if err != nil {
return err
}

unstructured, err := resource.List(ctx, metav1.ListOptions{})
if err != nil {
return err
}

for _, unstructured := range unstructured.Items {
pkg := &datapackagingv1alpha1.Package{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.Object, pkg)
if err != nil {
return err
}

ch <- pkg
}
return nil
}

// getPkgs returns the list of packages for the given cluster and namespace
Expand Down

0 comments on commit 2769a2c

Please sign in to comment.