From 2769a2c9a00ac8469d91129da8a95d0fc394d8a6 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Mon, 7 Mar 2022 13:10:22 +1100 Subject: [PATCH] Reduce Carvel response times for GetAvailablePackageSummaries (#4378) * Refactor to use slice with correctly paginated data. Signed-off-by: Michael Nelson * Refactor to fetch all package versions at once. Signed-off-by: Michael Nelson * Add buffering to channel. No effect for TCE repo. Signed-off-by: Michael Nelson * Removed assumption of version order for packages. Signed-off-by: Michael Nelson --- .../core/packages/v1alpha1/packages.go | 3 - .../packages/v1alpha1/server_ctrl_packages.go | 134 ++++++++-------- .../packages/v1alpha1/server_data_adapters.go | 14 +- .../v1alpha1/server_data_resources.go | 28 +++- .../packages/v1alpha1/server_test.go | 145 +++++++++++++++++- 5 files changed, 239 insertions(+), 85 deletions(-) diff --git a/cmd/kubeapps-apis/core/packages/v1alpha1/packages.go b/cmd/kubeapps-apis/core/packages/v1alpha1/packages.go index a12ff855146..6f6bb976253 100644 --- a/cmd/kubeapps-apis/core/packages/v1alpha1/packages.go +++ b/cmd/kubeapps-apis/core/packages/v1alpha1/packages.go @@ -77,10 +77,7 @@ func (s packagesServer) GetAvailablePackageSummaries(ctx context.Context, reques // TODO: We can do these in parallel in separate go routines. for _, p := range s.pluginsWithServers { - log.Infof("Items now: %d/%d", len(pkgs), (pageOffset*int(pageSize) + int(pageSize))) if pageSize == 0 || len(pkgs) <= (pageOffset*int(pageSize)+int(pageSize)) { - log.Infof("Should enter") - response, err := p.server.GetAvailablePackageSummaries(ctx, requestN) if err != nil { return nil, status.Errorf(status.Convert(err).Code(), "Invalid GetAvailablePackageSummaries response from the plugin %v: %v", p.plugin.Name, err) diff --git a/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_ctrl_packages.go b/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_ctrl_packages.go index 5071f53fc29..de35cf815ad 100644 --- a/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_ctrl_packages.go +++ b/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_ctrl_packages.go @@ -26,6 +26,8 @@ import ( log "k8s.io/klog/v2" ) +const PACKAGES_CHANNEL_BUFFER_SIZE = 20 + // GetAvailablePackageSummaries returns the available packages managed by the 'kapp_controller' plugin func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *corev1.GetAvailablePackageSummariesRequest) (*corev1.GetAvailablePackageSummariesResponse, error) { // Retrieve parameters from the request @@ -45,61 +47,90 @@ func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *core cluster = s.globalPackagingCluster } // fetch all the package metadatas + // TODO(minelson): We should be grabbing only the requested page + // of results here. pkgMetadatas, err := s.getPkgMetadatas(ctx, cluster, namespace) if err != nil { return nil, statuserror.FromK8sError("get", "PackageMetadata", "", err) } - - // Create a channel to receive any results. The channel is also used as a - // natural waitgroup to synchronize the results. - type fetchResult struct { - index int - availablePackageSummary *corev1.AvailablePackageSummary - err error + // Until the above request uses the pagination, update the slice + // to be the correct page of results. + startAt := 0 + if pageSize > 0 { + startAt = int(pageSize) * pageOffset + if startAt > len(pkgMetadatas) { + return nil, status.Errorf(codes.InvalidArgument, "invalid pagination arguments %v", request.GetPaginationOptions()) + } + pkgMetadatas = pkgMetadatas[startAt:] + if len(pkgMetadatas) > int(pageSize) { + pkgMetadatas = pkgMetadatas[:pageSize] + } } - fetchResults := make(chan fetchResult) - numFetched := 0 - // TODO(agamez): DRY up this logic (cf GetInstalledPackageSummaries) - if len(pkgMetadatas) > 0 { - startAt := 0 - if pageSize > 0 { - startAt = int(pageSize) * pageOffset - } - for i, pkgMetadata := range pkgMetadatas { - if startAt <= i { - numFetched++ - go func(i int, pkgMetadata *datapackagingv1alpha1.PackageMetadata) { - availablePackageSummary, err := s.fetchPackageSummaryForMeta(ctx, cluster, namespace, pkgMetadata) + // Create a channel to receive all packages available in the namespace. + // Using a buffered channel so that we don't block the network request if we + // can't process fast enough. + getPkgsChannel := make(chan *datapackagingv1alpha1.Package, PACKAGES_CHANNEL_BUFFER_SIZE) + var getPkgsError error + go func() { + getPkgsError = s.getPkgs(ctx, cluster, namespace, getPkgsChannel) + }() - // The index of this result is relative to the page. - fetchResults <- fetchResult{i - startAt, availablePackageSummary, err} - }(i, pkgMetadata) - } - // if we've reached the end of the page, stop iterating - if pageSize > 0 && numFetched == int(pageSize) { - break - } - } + // Skip through the packages until we get to the first item in our + // paginated results. + currentPkg := <-getPkgsChannel + for currentPkg != nil && currentPkg.Spec.RefName != pkgMetadatas[0].Name { + currentPkg = <-getPkgsChannel } - // Return an error if any is found. We continue only if there were no - // errors. - availablePackageSummaries := make([]*corev1.AvailablePackageSummary, numFetched) + + availablePackageSummaries := make([]*corev1.AvailablePackageSummary, len(pkgMetadatas)) categories := []string{} - for i := 0; i < numFetched; i++ { - fetchResult := <-fetchResults - if fetchResult.err != nil { - return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected error while gathering available packages: %v", err)) + pkgsForMeta := []*datapackagingv1alpha1.Package{} + for i, pkgMetadata := range pkgMetadatas { + // currentPkg will be nil if the channel is closed and there's no + // more items to consume. + if currentPkg == nil { + return nil, statuserror.FromK8sError("get", "Package", pkgMetadata.Name, fmt.Errorf("no package versions for the package %q", pkgMetadata.Name)) } - // append the availablePackageSummary to the slice - availablePackageSummaries[fetchResult.index] = fetchResult.availablePackageSummary - categories = append(categories, fetchResult.availablePackageSummary.Categories...) + // The kapp-controller returns both packages and package metadata + // in order. + if currentPkg.Spec.RefName != pkgMetadata.Name { + return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected order for kapp-controller packages, expected %q, found %q", pkgMetadata.Name, currentPkg.Spec.RefName)) + } + // Collect the packages for a particular refName to be able to send the + // latest semver version. For the moment, kapp-controller just returns + // CRs with the default alpha sorting of the CR name. + // Ref https://kubernetes.slack.com/archives/CH8KCCKA5/p1646285201181119 + pkgsForMeta = append(pkgsForMeta, currentPkg) + currentPkg = <-getPkgsChannel + for currentPkg != nil && currentPkg.Spec.RefName == pkgMetadata.Name { + pkgsForMeta = append(pkgsForMeta, currentPkg) + currentPkg = <-getPkgsChannel + } + // At this point, we have all the packages collected that match + // this ref name, and currentPkg is for the next meta name. + pkgVersionMap, err := getPkgVersionsMap(pkgsForMeta) + if err != nil || len(pkgVersionMap[pkgMetadata.Name]) == 0 { + return nil, status.Errorf(codes.Internal, fmt.Sprintf("unable to calculate package versions map for packages: %v, err: %v", pkgsForMeta, err)) + } + latestVersion := pkgVersionMap[pkgMetadata.Name][0].version.String() + availablePackageSummary := s.buildAvailablePackageSummary(pkgMetadata, latestVersion, cluster) + availablePackageSummaries[i] = availablePackageSummary + categories = append(categories, availablePackageSummary.Categories...) + + // Reset the packages for the current meta name. + pkgsForMeta = pkgsForMeta[:0] + } + + // Verify no error during go routine. + if getPkgsError != nil { + return nil, statuserror.FromK8sError("get", "Package", "", err) } // Only return a next page token if the request was for pagination and // the results are a full page. nextPageToken := "" - if pageSize > 0 && numFetched == int(pageSize) { + if pageSize > 0 && len(availablePackageSummaries) == int(pageSize) { nextPageToken = fmt.Sprintf("%d", pageOffset+1) } response := &corev1.GetAvailablePackageSummariesResponse{ @@ -110,29 +141,6 @@ func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *core return response, nil } -func (s *Server) fetchPackageSummaryForMeta(ctx context.Context, cluster, namespace string, pkgMetadata *datapackagingv1alpha1.PackageMetadata) (*corev1.AvailablePackageSummary, error) { - // fetch the associated packages - // Use the field selector to return only Package CRs that match on the spec.refName. - // TODO(agamez): perhaps we better fetch all the packages and filter ourselves to reduce the k8s calls - fieldSelector := fmt.Sprintf("spec.refName=%s", pkgMetadata.Name) - pkgs, err := s.getPkgsWithFieldSelector(ctx, cluster, namespace, fieldSelector) - if err != nil { - return nil, statuserror.FromK8sError("get", "Package", pkgMetadata.Name, err) - } - pkgVersionsMap, err := getPkgVersionsMap(pkgs) - if err != nil { - return nil, err - } - - // generate the availablePackageSummary from the fetched information - availablePackageSummary, err := s.buildAvailablePackageSummary(pkgMetadata, pkgVersionsMap, cluster) - if err != nil { - return nil, statuserror.FromK8sError("create", "AvailablePackageSummary", pkgMetadata.Name, err) - } - - return availablePackageSummary, nil -} - // GetAvailablePackageVersions returns the package versions managed by the 'kapp_controller' plugin func (s *Server) GetAvailablePackageVersions(ctx context.Context, request *corev1.GetAvailablePackageVersionsRequest) (*corev1.GetAvailablePackageVersionsResponse, error) { // Retrieve parameters from the request diff --git a/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_data_adapters.go b/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_data_adapters.go index b30017701d1..536569fcbb0 100644 --- a/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_data_adapters.go +++ b/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_data_adapters.go @@ -25,15 +25,9 @@ import ( log "k8s.io/klog/v2" ) -func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1.PackageMetadata, pkgVersionsMap map[string][]pkgSemver, cluster string) (*corev1.AvailablePackageSummary, error) { +func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1.PackageMetadata, latestVersion string, cluster string) *corev1.AvailablePackageSummary { var iconStringBuilder strings.Builder - // get the versions associated with the package - versions := pkgVersionsMap[pkgMetadata.Name] - if len(versions) == 0 { - return nil, fmt.Errorf("no package versions for the package %q", pkgMetadata.Name) - } - // Carvel uses base64-encoded SVG data for IconSVGBase64, whereas we need // a url, so convert to a data-url. @@ -58,8 +52,8 @@ func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1 // Currently, PkgVersion and AppVersion are the same // https://kubernetes.slack.com/archives/CH8KCCKA5/p1636386358322000?thread_ts=1636371493.320900&cid=CH8KCCKA5 LatestVersion: &corev1.PackageAppVersion{ - PkgVersion: versions[0].version.String(), - AppVersion: versions[0].version.String(), + PkgVersion: latestVersion, + AppVersion: latestVersion, }, IconUrl: iconStringBuilder.String(), DisplayName: pkgMetadata.Spec.DisplayName, @@ -67,7 +61,7 @@ func (s *Server) buildAvailablePackageSummary(pkgMetadata *datapackagingv1alpha1 Categories: pkgMetadata.Spec.Categories, } - return availablePackageSummary, nil + return availablePackageSummary } func (s *Server) buildAvailablePackageDetail(pkgMetadata *datapackagingv1alpha1.PackageMetadata, requestedPkgVersion string, foundPkgSemver *pkgSemver, cluster string) (*corev1.AvailablePackageDetail, error) { diff --git a/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_data_resources.go b/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_data_resources.go index d26fa81dbb1..b429142c87a 100644 --- a/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_data_resources.go +++ b/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_data_resources.go @@ -203,9 +203,31 @@ func (s *Server) getApp(ctx context.Context, cluster, namespace, identifier stri // List of resources getters -// getPkgs returns the list of packages for the given cluster and namespace -func (s *Server) getPkgs(ctx context.Context, cluster, namespace string) ([]*datapackagingv1alpha1.Package, error) { - return s.getPkgsWithFieldSelector(ctx, cluster, namespace, "") +// getPkgs requests the packages for the given cluster and namespace and sends +// them to the channel to be processed immediately, closing the channel +// when finished or when an error is returned. +func (s *Server) getPkgs(ctx context.Context, cluster, namespace string, ch chan<- *datapackagingv1alpha1.Package) error { + defer close(ch) + resource, err := s.getPkgResource(ctx, cluster, namespace) + if err != nil { + return err + } + + unstructured, err := resource.List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + + for _, unstructured := range unstructured.Items { + pkg := &datapackagingv1alpha1.Package{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.Object, pkg) + if err != nil { + return err + } + + ch <- pkg + } + return nil } // getPkgs returns the list of packages for the given cluster and namespace diff --git a/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_test.go b/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_test.go index 361f38c35bc..9c9e35e583a 100644 --- a/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_test.go +++ b/cmd/kubeapps-apis/plugins/kapp_controller/packages/v1alpha1/server_test.go @@ -182,6 +182,36 @@ func TestGetAvailablePackageSummaries(t *testing.T) { }, expectedStatusCode: codes.Internal, }, + { + name: "it returns an invalid argument error status if a page is requested that doesn't exist", + existingObjects: []runtime.Object{ + &datapackagingv1alpha1.PackageMetadata{ + TypeMeta: metav1.TypeMeta{ + Kind: pkgMetadataResource, + APIVersion: datapackagingAPIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "tetris.foo.example.com", + }, + Spec: datapackagingv1alpha1.PackageMetadataSpec{ + DisplayName: "Classic Tetris", + IconSVGBase64: "Tm90IHJlYWxseSBTVkcK", + ShortDescription: "A great game for arcade gamers", + LongDescription: "A few sentences but not really a readme", + Categories: []string{"logging", "daemon-set"}, + Maintainers: []datapackagingv1alpha1.Maintainer{{Name: "person1"}, {Name: "person2"}}, + SupportDescription: "Some support information", + ProviderName: "Tetris inc.", + }, + }, + }, + paginationOptions: corev1.PaginationOptions{ + PageToken: "2", + PageSize: 1, + }, + expectedStatusCode: codes.InvalidArgument, + }, { name: "it returns carvel package summaries with basic info from the cluster", existingObjects: []runtime.Object{ @@ -359,7 +389,7 @@ func TestGetAvailablePackageSummaries(t *testing.T) { }, }, { - name: "it returns the latest semver version in the latest version field", + name: "it returns the latest semver version in the latest version field without relying on default alpha sorting", existingObjects: []runtime.Object{ &datapackagingv1alpha1.PackageMetadata{ TypeMeta: metav1.TypeMeta{ @@ -406,11 +436,11 @@ func TestGetAvailablePackageSummaries(t *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "tetris.foo.example.com.1.2.7", + Name: "tetris.foo.example.com.1.2.10", }, Spec: datapackagingv1alpha1.PackageSpec{ RefName: "tetris.foo.example.com", - Version: "1.2.7", + Version: "1.2.10", Licenses: []string{"my-license"}, ReleaseNotes: "release notes", CapactiyRequirementsDescription: "capacity description", @@ -446,8 +476,8 @@ func TestGetAvailablePackageSummaries(t *testing.T) { Name: "tetris.foo.example.com", DisplayName: "Classic Tetris", LatestVersion: &corev1.PackageAppVersion{ - PkgVersion: "1.2.7", - AppVersion: "1.2.7", + PkgVersion: "1.2.10", + AppVersion: "1.2.10", }, IconUrl: "", ShortDescription: "A great game for arcade gamers", @@ -456,7 +486,7 @@ func TestGetAvailablePackageSummaries(t *testing.T) { }, }, { - name: "it returns paginated carvel package summaries", + name: "it returns paginated carvel package summaries with an offset", existingObjects: []runtime.Object{ &datapackagingv1alpha1.PackageMetadata{ TypeMeta: metav1.TypeMeta{ @@ -558,6 +588,109 @@ func TestGetAvailablePackageSummaries(t *testing.T) { }, }, }, + { + name: "it returns paginated carvel package summaries limited to the page size", + existingObjects: []runtime.Object{ + &datapackagingv1alpha1.PackageMetadata{ + TypeMeta: metav1.TypeMeta{ + Kind: pkgMetadataResource, + APIVersion: datapackagingAPIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "tetris.foo.example.com", + }, + Spec: datapackagingv1alpha1.PackageMetadataSpec{ + DisplayName: "Classic Tetris", + IconSVGBase64: "Tm90IHJlYWxseSBTVkcK", + ShortDescription: "A great game for arcade gamers", + LongDescription: "A few sentences but not really a readme", + Categories: []string{"logging", "daemon-set"}, + Maintainers: []datapackagingv1alpha1.Maintainer{{Name: "person1"}, {Name: "person2"}}, + SupportDescription: "Some support information", + ProviderName: "Tetris inc.", + }, + }, + &datapackagingv1alpha1.PackageMetadata{ + TypeMeta: metav1.TypeMeta{ + Kind: pkgMetadataResource, + APIVersion: datapackagingAPIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "tombi.foo.example.com", + }, + Spec: datapackagingv1alpha1.PackageMetadataSpec{ + DisplayName: "Tombi!", + IconSVGBase64: "Tm90IHJlYWxseSBTVkcK", + ShortDescription: "An awesome game from the 90's", + LongDescription: "Tombi! is an open world platform-adventure game with RPG elements.", + Categories: []string{"platforms", "rpg"}, + Maintainers: []datapackagingv1alpha1.Maintainer{{Name: "person1"}, {Name: "person2"}}, + SupportDescription: "Some support information", + ProviderName: "Tombi!", + }, + }, + &datapackagingv1alpha1.Package{ + TypeMeta: metav1.TypeMeta{ + Kind: pkgResource, + APIVersion: datapackagingAPIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "tetris.foo.example.com.1.2.3", + }, + Spec: datapackagingv1alpha1.PackageSpec{ + RefName: "tetris.foo.example.com", + Version: "1.2.3", + Licenses: []string{"my-license"}, + ReleaseNotes: "release notes", + CapactiyRequirementsDescription: "capacity description", + ReleasedAt: metav1.Time{time.Date(1984, time.June, 6, 0, 0, 0, 0, time.UTC)}, + }, + }, + &datapackagingv1alpha1.Package{ + TypeMeta: metav1.TypeMeta{ + Kind: pkgResource, + APIVersion: datapackagingAPIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "tombi.foo.example.com.1.2.5", + }, + Spec: datapackagingv1alpha1.PackageSpec{ + RefName: "tombi.foo.example.com", + Version: "1.2.5", + Licenses: []string{"my-license"}, + ReleaseNotes: "release notes", + CapactiyRequirementsDescription: "capacity description", + ReleasedAt: metav1.Time{time.Date(1997, time.December, 25, 0, 0, 0, 0, time.UTC)}, + }, + }, + }, + paginationOptions: corev1.PaginationOptions{ + PageToken: "0", + PageSize: 1, + }, + expectedPackages: []*corev1.AvailablePackageSummary{ + { + AvailablePackageRef: &corev1.AvailablePackageReference{ + Context: defaultContext, + Plugin: &pluginDetail, + Identifier: "tetris.foo.example.com", + }, + Name: "tetris.foo.example.com", + DisplayName: "Classic Tetris", + LatestVersion: &corev1.PackageAppVersion{ + PkgVersion: "1.2.3", + AppVersion: "1.2.3", + }, + IconUrl: "", + ShortDescription: "A great game for arcade gamers", + Categories: []string{"logging", "daemon-set"}, + }, + }, + }, } for _, tc := range testCases {