Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Carvel response times for GetAvailablePackageSummaries #4378

Merged
merged 4 commits into from Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions cmd/kubeapps-apis/core/packages/v1alpha1/packages.go
Expand Up @@ -77,10 +77,7 @@ func (s packagesServer) GetAvailablePackageSummaries(ctx context.Context, reques

// TODO: We can do these in parallel in separate go routines.
for _, p := range s.pluginsWithServers {
log.Infof("Items now: %d/%d", len(pkgs), (pageOffset*int(pageSize) + int(pageSize)))
if pageSize == 0 || len(pkgs) <= (pageOffset*int(pageSize)+int(pageSize)) {
log.Infof("Should enter")

response, err := p.server.GetAvailablePackageSummaries(ctx, requestN)
if err != nil {
return nil, status.Errorf(status.Convert(err).Code(), "Invalid GetAvailablePackageSummaries response from the plugin %v: %v", p.plugin.Name, err)
Expand Down
Expand Up @@ -45,61 +45,80 @@ func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *core
cluster = s.globalPackagingCluster
}
// fetch all the package metadatas
// TODO(minelson): We should be grabbing only the requested page
// of results here.
pkgMetadatas, err := s.getPkgMetadatas(ctx, cluster, namespace)
if err != nil {
return nil, statuserror.FromK8sError("get", "PackageMetadata", "", err)
}

// Create a channel to receive any results. The channel is also used as a
// natural waitgroup to synchronize the results.
type fetchResult struct {
index int
availablePackageSummary *corev1.AvailablePackageSummary
err error
// Until the above request uses the pagination, update the slice
// to be the correct page of results.
startAt := 0
if pageSize > 0 {
startAt = int(pageSize) * pageOffset
if startAt > len(pkgMetadatas) {
return nil, status.Errorf(codes.InvalidArgument, "invalid pagination arguments %v", request.GetPaginationOptions())
}
pkgMetadatas = pkgMetadatas[startAt:]
if len(pkgMetadatas) > int(pageSize) {
pkgMetadatas = pkgMetadatas[:pageSize]
}
}
fetchResults := make(chan fetchResult)
numFetched := 0

// TODO(agamez): DRY up this logic (cf GetInstalledPackageSummaries)
if len(pkgMetadatas) > 0 {
startAt := 0
if pageSize > 0 {
startAt = int(pageSize) * pageOffset
}
for i, pkgMetadata := range pkgMetadatas {
if startAt <= i {
numFetched++
go func(i int, pkgMetadata *datapackagingv1alpha1.PackageMetadata) {
availablePackageSummary, err := s.fetchPackageSummaryForMeta(ctx, cluster, namespace, pkgMetadata)
// Create a channel to receive all packages available in the namespace
getPkgsChannel := make(chan *datapackagingv1alpha1.Package)
var getPkgsError error
go func() {
getPkgsError = s.getPkgs(ctx, cluster, namespace, getPkgsChannel)
}()

// The index of this result is relative to the page.
fetchResults <- fetchResult{i - startAt, availablePackageSummary, err}
}(i, pkgMetadata)
}
// if we've reached the end of the page, stop iterating
if pageSize > 0 && numFetched == int(pageSize) {
break
}
}
// Skip through the packages until we get to the first item in our
// paginated results.
currentPkg := <-getPkgsChannel
for currentPkg != nil && currentPkg.Spec.RefName != pkgMetadatas[0].Name {
currentPkg = <-getPkgsChannel
}
// Return an error if any is found. We continue only if there were no
// errors.
availablePackageSummaries := make([]*corev1.AvailablePackageSummary, numFetched)

availablePackageSummaries := make([]*corev1.AvailablePackageSummary, len(pkgMetadatas))
categories := []string{}
for i := 0; i < numFetched; i++ {
fetchResult := <-fetchResults
if fetchResult.err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected error while gathering available packages: %v", err))
for i, pkgMetadata := range pkgMetadatas {
// currentPkg will be nil if the channel is closed and there's no
// more items to consume.
if currentPkg == nil {
return nil, statuserror.FromK8sError("get", "Package", pkgMetadata.Name, fmt.Errorf("no package versions for the package %q", pkgMetadata.Name))
}
// append the availablePackageSummary to the slice
availablePackageSummaries[fetchResult.index] = fetchResult.availablePackageSummary
categories = append(categories, fetchResult.availablePackageSummary.Categories...)
// The kapp-controller returns both packages and package metadata
// in order.
if currentPkg.Spec.RefName != pkgMetadata.Name {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected order for kapp-controller packages, expected %q, found %q", pkgMetadata.Name, currentPkg.Spec.RefName))
}
// Skip ahead until we get the last version for this package name.
// TODO(minelson): check if kapp-controller also returns packages
// in version order (not just alphanumeric). If so, a small change
// will be required here.
// Asked at https://kubernetes.slack.com/archives/CH8KCCKA5/p1646285201181119
nextPkg := <-getPkgsChannel
for nextPkg != nil && nextPkg.Spec.RefName == pkgMetadata.Name {
currentPkg = nextPkg
nextPkg = <-getPkgsChannel
}
// At this point, currentPkg is the last package with the matching
// ref name.
availablePackageSummary := s.buildAvailablePackageSummary(pkgMetadata, currentPkg.Spec.Version, cluster)
availablePackageSummaries[i] = availablePackageSummary
categories = append(categories, availablePackageSummary.Categories...)
currentPkg = nextPkg
}

// Verify no error during go routine.
if getPkgsError != nil {
return nil, statuserror.FromK8sError("get", "Package", "", err)
}

// Only return a next page token if the request was for pagination and
// the results are a full page.
nextPageToken := ""
if pageSize > 0 && numFetched == int(pageSize) {
if pageSize > 0 && len(availablePackageSummaries) == int(pageSize) {
nextPageToken = fmt.Sprintf("%d", pageOffset+1)
}
response := &corev1.GetAvailablePackageSummariesResponse{
Expand All @@ -110,29 +129,6 @@ func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *core
return response, nil
}

func (s *Server) fetchPackageSummaryForMeta(ctx context.Context, cluster, namespace string, pkgMetadata *datapackagingv1alpha1.PackageMetadata) (*corev1.AvailablePackageSummary, error) {
// fetch the associated packages
// Use the field selector to return only Package CRs that match on the spec.refName.
// TODO(agamez): perhaps we better fetch all the packages and filter ourselves to reduce the k8s calls
fieldSelector := fmt.Sprintf("spec.refName=%s", pkgMetadata.Name)
pkgs, err := s.getPkgsWithFieldSelector(ctx, cluster, namespace, fieldSelector)
if err != nil {
return nil, statuserror.FromK8sError("get", "Package", pkgMetadata.Name, err)
}
pkgVersionsMap, err := getPkgVersionsMap(pkgs)
if err != nil {
return nil, err
}

// generate the availablePackageSummary from the fetched information
availablePackageSummary, err := s.buildAvailablePackageSummary(pkgMetadata, pkgVersionsMap, cluster)
if err != nil {
return nil, statuserror.FromK8sError("create", "AvailablePackageSummary", pkgMetadata.Name, err)
}

return availablePackageSummary, nil
}

// GetAvailablePackageVersions returns the package versions managed by the 'kapp_controller' plugin
func (s *Server) GetAvailablePackageVersions(ctx context.Context, request *corev1.GetAvailablePackageVersionsRequest) (*corev1.GetAvailablePackageVersionsResponse, error) {
// Retrieve parameters from the request
Expand Down
Expand Up @@ -25,15 +25,9 @@ import (
log "k8s.io/klog/v2"
)

func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1.PackageMetadata, pkgVersionsMap map[string][]pkgSemver, cluster string) (*corev1.AvailablePackageSummary, error) {
func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1.PackageMetadata, latestVersion string, cluster string) *corev1.AvailablePackageSummary {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no requirement for having the package map, just the latest version, which also means we don't need the ability to return an error here.

var iconStringBuilder strings.Builder

// get the versions associated with the package
versions := pkgVersionsMap[pkgMetadata.Name]
if len(versions) == 0 {
return nil, fmt.Errorf("no package versions for the package %q", pkgMetadata.Name)
}

// Carvel uses base64-encoded SVG data for IconSVGBase64, whereas we need
// a url, so convert to a data-url.

Expand All @@ -58,16 +52,16 @@ func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1
// Currently, PkgVersion and AppVersion are the same
// https://kubernetes.slack.com/archives/CH8KCCKA5/p1636386358322000?thread_ts=1636371493.320900&cid=CH8KCCKA5
LatestVersion: &corev1.PackageAppVersion{
PkgVersion: versions[0].version.String(),
AppVersion: versions[0].version.String(),
PkgVersion: latestVersion,
AppVersion: latestVersion,
},
IconUrl: iconStringBuilder.String(),
DisplayName: pkgMetadata.Spec.DisplayName,
ShortDescription: pkgMetadata.Spec.ShortDescription,
Categories: pkgMetadata.Spec.Categories,
}

return availablePackageSummary, nil
return availablePackageSummary
}

func (s *Server) buildAvailablePackageDetail(pkgMetadata *datapackagingv1alpha1.PackageMetadata, requestedPkgVersion string, foundPkgSemver *pkgSemver, cluster string) (*corev1.AvailablePackageDetail, error) {
Expand Down
Expand Up @@ -203,9 +203,31 @@ func (s *Server) getApp(ctx context.Context, cluster, namespace, identifier stri

// List of resources getters

// getPkgs returns the list of packages for the given cluster and namespace
func (s *Server) getPkgs(ctx context.Context, cluster, namespace string) ([]*datapackagingv1alpha1.Package, error) {
return s.getPkgsWithFieldSelector(ctx, cluster, namespace, "")
// getPkgs requests the packages for the given cluster and namespace and sends
// them to the channel to be processed immediately, closing the channel
// when finished or when an error is returned.
func (s *Server) getPkgs(ctx context.Context, cluster, namespace string, ch chan<- *datapackagingv1alpha1.Package) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function wasn't actually used at all, so I updated it to do what I want. Importantly, it does not iterate all the results into an in-memory slice, but rather sends each package down the return channel as it iterates (so we can process them while it continues iterating).

The other important point here is, as mentioned in the comment, that this function guarantees to close the channel, so we can be sure when it has completed (or returned with an error).

defer close(ch)
resource, err := s.getPkgResource(ctx, cluster, namespace)
if err != nil {
return err
}

unstructured, err := resource.List(ctx, metav1.ListOptions{})
if err != nil {
return err
}

for _, unstructured := range unstructured.Items {
pkg := &datapackagingv1alpha1.Package{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.Object, pkg)
if err != nil {
return err
}

ch <- pkg
}
return nil
}

// getPkgs returns the list of packages for the given cluster and namespace
Expand Down
Expand Up @@ -182,6 +182,36 @@ func TestGetAvailablePackageSummaries(t *testing.T) {
},
expectedStatusCode: codes.Internal,
},
{
name: "it returns an invalid argument error status if a page is requested that doesn't exist",
existingObjects: []runtime.Object{
&datapackagingv1alpha1.PackageMetadata{
TypeMeta: metav1.TypeMeta{
Kind: pkgMetadataResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tetris.foo.example.com",
},
Spec: datapackagingv1alpha1.PackageMetadataSpec{
DisplayName: "Classic Tetris",
IconSVGBase64: "Tm90IHJlYWxseSBTVkcK",
ShortDescription: "A great game for arcade gamers",
LongDescription: "A few sentences but not really a readme",
Categories: []string{"logging", "daemon-set"},
Maintainers: []datapackagingv1alpha1.Maintainer{{Name: "person1"}, {Name: "person2"}},
SupportDescription: "Some support information",
ProviderName: "Tetris inc.",
},
},
},
paginationOptions: corev1.PaginationOptions{
PageToken: "2",
PageSize: 1,
},
expectedStatusCode: codes.InvalidArgument,
},
{
name: "it returns carvel package summaries with basic info from the cluster",
existingObjects: []runtime.Object{
Expand Down Expand Up @@ -456,7 +486,7 @@ func TestGetAvailablePackageSummaries(t *testing.T) {
},
},
{
name: "it returns paginated carvel package summaries",
name: "it returns paginated carvel package summaries with an offset",
existingObjects: []runtime.Object{
&datapackagingv1alpha1.PackageMetadata{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -558,6 +588,109 @@ func TestGetAvailablePackageSummaries(t *testing.T) {
},
},
},
{
name: "it returns paginated carvel package summaries limited to the page size",
existingObjects: []runtime.Object{
&datapackagingv1alpha1.PackageMetadata{
TypeMeta: metav1.TypeMeta{
Kind: pkgMetadataResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tetris.foo.example.com",
},
Spec: datapackagingv1alpha1.PackageMetadataSpec{
DisplayName: "Classic Tetris",
IconSVGBase64: "Tm90IHJlYWxseSBTVkcK",
ShortDescription: "A great game for arcade gamers",
LongDescription: "A few sentences but not really a readme",
Categories: []string{"logging", "daemon-set"},
Maintainers: []datapackagingv1alpha1.Maintainer{{Name: "person1"}, {Name: "person2"}},
SupportDescription: "Some support information",
ProviderName: "Tetris inc.",
},
},
&datapackagingv1alpha1.PackageMetadata{
TypeMeta: metav1.TypeMeta{
Kind: pkgMetadataResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tombi.foo.example.com",
},
Spec: datapackagingv1alpha1.PackageMetadataSpec{
DisplayName: "Tombi!",
IconSVGBase64: "Tm90IHJlYWxseSBTVkcK",
ShortDescription: "An awesome game from the 90's",
LongDescription: "Tombi! is an open world platform-adventure game with RPG elements.",
Categories: []string{"platforms", "rpg"},
Maintainers: []datapackagingv1alpha1.Maintainer{{Name: "person1"}, {Name: "person2"}},
SupportDescription: "Some support information",
ProviderName: "Tombi!",
},
},
&datapackagingv1alpha1.Package{
TypeMeta: metav1.TypeMeta{
Kind: pkgResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tetris.foo.example.com.1.2.3",
},
Spec: datapackagingv1alpha1.PackageSpec{
RefName: "tetris.foo.example.com",
Version: "1.2.3",
Licenses: []string{"my-license"},
ReleaseNotes: "release notes",
CapactiyRequirementsDescription: "capacity description",
ReleasedAt: metav1.Time{time.Date(1984, time.June, 6, 0, 0, 0, 0, time.UTC)},
},
},
&datapackagingv1alpha1.Package{
TypeMeta: metav1.TypeMeta{
Kind: pkgResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tombi.foo.example.com.1.2.5",
},
Spec: datapackagingv1alpha1.PackageSpec{
RefName: "tombi.foo.example.com",
Version: "1.2.5",
Licenses: []string{"my-license"},
ReleaseNotes: "release notes",
CapactiyRequirementsDescription: "capacity description",
ReleasedAt: metav1.Time{time.Date(1997, time.December, 25, 0, 0, 0, 0, time.UTC)},
},
},
},
paginationOptions: corev1.PaginationOptions{
PageToken: "0",
PageSize: 1,
},
expectedPackages: []*corev1.AvailablePackageSummary{
{
AvailablePackageRef: &corev1.AvailablePackageReference{
Context: defaultContext,
Plugin: &pluginDetail,
Identifier: "tetris.foo.example.com",
},
Name: "tetris.foo.example.com",
DisplayName: "Classic Tetris",
LatestVersion: &corev1.PackageAppVersion{
PkgVersion: "1.2.3",
AppVersion: "1.2.3",
},
IconUrl: "",
ShortDescription: "A great game for arcade gamers",
Categories: []string{"logging", "daemon-set"},
},
},
},
}

for _, tc := range testCases {
Expand Down