Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Carvel response times for GetAvailablePackageSummaries #4378

Merged
merged 4 commits into from Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions cmd/kubeapps-apis/core/packages/v1alpha1/packages.go
Expand Up @@ -77,10 +77,7 @@ func (s packagesServer) GetAvailablePackageSummaries(ctx context.Context, reques

// TODO: We can do these in parallel in separate go routines.
for _, p := range s.pluginsWithServers {
log.Infof("Items now: %d/%d", len(pkgs), (pageOffset*int(pageSize) + int(pageSize)))
if pageSize == 0 || len(pkgs) <= (pageOffset*int(pageSize)+int(pageSize)) {
log.Infof("Should enter")

response, err := p.server.GetAvailablePackageSummaries(ctx, requestN)
if err != nil {
return nil, status.Errorf(status.Convert(err).Code(), "Invalid GetAvailablePackageSummaries response from the plugin %v: %v", p.plugin.Name, err)
Expand Down
Expand Up @@ -26,6 +26,8 @@ import (
log "k8s.io/klog/v2"
)

const PACKAGES_CHANNEL_BUFFER_SIZE = 20

// GetAvailablePackageSummaries returns the available packages managed by the 'kapp_controller' plugin
func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *corev1.GetAvailablePackageSummariesRequest) (*corev1.GetAvailablePackageSummariesResponse, error) {
// Retrieve parameters from the request
Expand All @@ -45,61 +47,90 @@ func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *core
cluster = s.globalPackagingCluster
}
// fetch all the package metadatas
// TODO(minelson): We should be grabbing only the requested page
// of results here.
pkgMetadatas, err := s.getPkgMetadatas(ctx, cluster, namespace)
if err != nil {
return nil, statuserror.FromK8sError("get", "PackageMetadata", "", err)
}

// Create a channel to receive any results. The channel is also used as a
// natural waitgroup to synchronize the results.
type fetchResult struct {
index int
availablePackageSummary *corev1.AvailablePackageSummary
err error
// Until the above request uses the pagination, update the slice
// to be the correct page of results.
startAt := 0
if pageSize > 0 {
startAt = int(pageSize) * pageOffset
if startAt > len(pkgMetadatas) {
return nil, status.Errorf(codes.InvalidArgument, "invalid pagination arguments %v", request.GetPaginationOptions())
}
pkgMetadatas = pkgMetadatas[startAt:]
if len(pkgMetadatas) > int(pageSize) {
pkgMetadatas = pkgMetadatas[:pageSize]
}
}
fetchResults := make(chan fetchResult)
numFetched := 0

// TODO(agamez): DRY up this logic (cf GetInstalledPackageSummaries)
if len(pkgMetadatas) > 0 {
startAt := 0
if pageSize > 0 {
startAt = int(pageSize) * pageOffset
}
for i, pkgMetadata := range pkgMetadatas {
if startAt <= i {
numFetched++
go func(i int, pkgMetadata *datapackagingv1alpha1.PackageMetadata) {
availablePackageSummary, err := s.fetchPackageSummaryForMeta(ctx, cluster, namespace, pkgMetadata)
// Create a channel to receive all packages available in the namespace.
// Using a buffered channel so that we don't block the network request if we
// can't process fast enough.
getPkgsChannel := make(chan *datapackagingv1alpha1.Package, PACKAGES_CHANNEL_BUFFER_SIZE)
var getPkgsError error
go func() {
getPkgsError = s.getPkgs(ctx, cluster, namespace, getPkgsChannel)
}()

// The index of this result is relative to the page.
fetchResults <- fetchResult{i - startAt, availablePackageSummary, err}
}(i, pkgMetadata)
}
// if we've reached the end of the page, stop iterating
if pageSize > 0 && numFetched == int(pageSize) {
break
}
}
// Skip through the packages until we get to the first item in our
// paginated results.
currentPkg := <-getPkgsChannel
for currentPkg != nil && currentPkg.Spec.RefName != pkgMetadatas[0].Name {
currentPkg = <-getPkgsChannel
}
// Return an error if any is found. We continue only if there were no
// errors.
availablePackageSummaries := make([]*corev1.AvailablePackageSummary, numFetched)

availablePackageSummaries := make([]*corev1.AvailablePackageSummary, len(pkgMetadatas))
categories := []string{}
for i := 0; i < numFetched; i++ {
fetchResult := <-fetchResults
if fetchResult.err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected error while gathering available packages: %v", err))
pkgsForMeta := []*datapackagingv1alpha1.Package{}
for i, pkgMetadata := range pkgMetadatas {
// currentPkg will be nil if the channel is closed and there's no
// more items to consume.
if currentPkg == nil {
return nil, statuserror.FromK8sError("get", "Package", pkgMetadata.Name, fmt.Errorf("no package versions for the package %q", pkgMetadata.Name))
}
// append the availablePackageSummary to the slice
availablePackageSummaries[fetchResult.index] = fetchResult.availablePackageSummary
categories = append(categories, fetchResult.availablePackageSummary.Categories...)
// The kapp-controller returns both packages and package metadata
// in order.
if currentPkg.Spec.RefName != pkgMetadata.Name {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected order for kapp-controller packages, expected %q, found %q", pkgMetadata.Name, currentPkg.Spec.RefName))
}
// Collect the packages for a particular refName to be able to send the
// latest semver version. For the moment, kapp-controller just returns
// CRs with the default alpha sorting of the CR name.
// Ref https://kubernetes.slack.com/archives/CH8KCCKA5/p1646285201181119
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for mentioning it. Interesting discussion!

pkgsForMeta = append(pkgsForMeta, currentPkg)
currentPkg = <-getPkgsChannel
for currentPkg != nil && currentPkg.Spec.RefName == pkgMetadata.Name {
pkgsForMeta = append(pkgsForMeta, currentPkg)
currentPkg = <-getPkgsChannel
}
// At this point, we have all the packages collected that match
// this ref name, and currentPkg is for the next meta name.
pkgVersionMap, err := getPkgVersionsMap(pkgsForMeta)
if err != nil || len(pkgVersionMap[pkgMetadata.Name]) == 0 {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unable to calculate package versions map for packages: %v, err: %v", pkgsForMeta, err))
}
latestVersion := pkgVersionMap[pkgMetadata.Name][0].version.String()
availablePackageSummary := s.buildAvailablePackageSummary(pkgMetadata, latestVersion, cluster)
availablePackageSummaries[i] = availablePackageSummary
categories = append(categories, availablePackageSummary.Categories...)

// Reset the packages for the current meta name.
pkgsForMeta = pkgsForMeta[:0]
}

// Verify no error during go routine.
if getPkgsError != nil {
return nil, statuserror.FromK8sError("get", "Package", "", err)
}

// Only return a next page token if the request was for pagination and
// the results are a full page.
nextPageToken := ""
if pageSize > 0 && numFetched == int(pageSize) {
if pageSize > 0 && len(availablePackageSummaries) == int(pageSize) {
nextPageToken = fmt.Sprintf("%d", pageOffset+1)
}
response := &corev1.GetAvailablePackageSummariesResponse{
Expand All @@ -110,29 +141,6 @@ func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *core
return response, nil
}

func (s *Server) fetchPackageSummaryForMeta(ctx context.Context, cluster, namespace string, pkgMetadata *datapackagingv1alpha1.PackageMetadata) (*corev1.AvailablePackageSummary, error) {
// fetch the associated packages
// Use the field selector to return only Package CRs that match on the spec.refName.
// TODO(agamez): perhaps we better fetch all the packages and filter ourselves to reduce the k8s calls
fieldSelector := fmt.Sprintf("spec.refName=%s", pkgMetadata.Name)
pkgs, err := s.getPkgsWithFieldSelector(ctx, cluster, namespace, fieldSelector)
if err != nil {
return nil, statuserror.FromK8sError("get", "Package", pkgMetadata.Name, err)
}
pkgVersionsMap, err := getPkgVersionsMap(pkgs)
if err != nil {
return nil, err
}

// generate the availablePackageSummary from the fetched information
availablePackageSummary, err := s.buildAvailablePackageSummary(pkgMetadata, pkgVersionsMap, cluster)
if err != nil {
return nil, statuserror.FromK8sError("create", "AvailablePackageSummary", pkgMetadata.Name, err)
}

return availablePackageSummary, nil
}

// GetAvailablePackageVersions returns the package versions managed by the 'kapp_controller' plugin
func (s *Server) GetAvailablePackageVersions(ctx context.Context, request *corev1.GetAvailablePackageVersionsRequest) (*corev1.GetAvailablePackageVersionsResponse, error) {
// Retrieve parameters from the request
Expand Down
Expand Up @@ -25,15 +25,9 @@ import (
log "k8s.io/klog/v2"
)

func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1.PackageMetadata, pkgVersionsMap map[string][]pkgSemver, cluster string) (*corev1.AvailablePackageSummary, error) {
func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1.PackageMetadata, latestVersion string, cluster string) *corev1.AvailablePackageSummary {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no requirement for having the package map, just the latest version, which also means we don't need the ability to return an error here.

var iconStringBuilder strings.Builder

// get the versions associated with the package
versions := pkgVersionsMap[pkgMetadata.Name]
if len(versions) == 0 {
return nil, fmt.Errorf("no package versions for the package %q", pkgMetadata.Name)
}

// Carvel uses base64-encoded SVG data for IconSVGBase64, whereas we need
// a url, so convert to a data-url.

Expand All @@ -58,16 +52,16 @@ func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1
// Currently, PkgVersion and AppVersion are the same
// https://kubernetes.slack.com/archives/CH8KCCKA5/p1636386358322000?thread_ts=1636371493.320900&cid=CH8KCCKA5
LatestVersion: &corev1.PackageAppVersion{
PkgVersion: versions[0].version.String(),
AppVersion: versions[0].version.String(),
PkgVersion: latestVersion,
AppVersion: latestVersion,
},
IconUrl: iconStringBuilder.String(),
DisplayName: pkgMetadata.Spec.DisplayName,
ShortDescription: pkgMetadata.Spec.ShortDescription,
Categories: pkgMetadata.Spec.Categories,
}

return availablePackageSummary, nil
return availablePackageSummary
}

func (s *Server) buildAvailablePackageDetail(pkgMetadata *datapackagingv1alpha1.PackageMetadata, requestedPkgVersion string, foundPkgSemver *pkgSemver, cluster string) (*corev1.AvailablePackageDetail, error) {
Expand Down
Expand Up @@ -203,9 +203,31 @@ func (s *Server) getApp(ctx context.Context, cluster, namespace, identifier stri

// List of resources getters

// getPkgs returns the list of packages for the given cluster and namespace
func (s *Server) getPkgs(ctx context.Context, cluster, namespace string) ([]*datapackagingv1alpha1.Package, error) {
return s.getPkgsWithFieldSelector(ctx, cluster, namespace, "")
// getPkgs requests the packages for the given cluster and namespace and sends
// them to the channel to be processed immediately, closing the channel
// when finished or when an error is returned.
func (s *Server) getPkgs(ctx context.Context, cluster, namespace string, ch chan<- *datapackagingv1alpha1.Package) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function wasn't actually used at all, so I updated it to do what I want. Importantly, it does not iterate all the results into an in-memory slice, but rather sends each package down the return channel as it iterates (so we can process them while it continues iterating).

The other important point here is, as mentioned in the comment, that this function guarantees to close the channel, so we can be sure when it has completed (or returned with an error).

defer close(ch)
resource, err := s.getPkgResource(ctx, cluster, namespace)
if err != nil {
return err
}

unstructured, err := resource.List(ctx, metav1.ListOptions{})
if err != nil {
return err
}

for _, unstructured := range unstructured.Items {
pkg := &datapackagingv1alpha1.Package{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.Object, pkg)
if err != nil {
return err
}

ch <- pkg
}
return nil
}

// getPkgs returns the list of packages for the given cluster and namespace
Expand Down