Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid writing to shared memory in go-routine. #4360

Merged
merged 1 commit into from Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -50,97 +50,89 @@ func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *core
return nil, statuserror.FromK8sError("get", "PackageMetadata", "", err)
}

// paginate the list of results
availablePackageSummaries := make([]*corev1.AvailablePackageSummary, len(pkgMetadatas))

// Create a channel to receive any errors. The channel is also used as a
// Create a channel to receive any results. The channel is also used as a
// natural waitgroup to synchronize the results.
errs := make(chan error)
numRoutines := 0
type fetchResult struct {
index int
availablePackageSummary *corev1.AvailablePackageSummary
err error
}
fetchResults := make(chan fetchResult)
numFetched := 0

// TODO(agamez): DRY up this logic (cf GetInstalledPackageSummaries)
if len(pkgMetadatas) > 0 {
startAt := -1
startAt := 0
if pageSize > 0 {
startAt = int(pageSize) * pageOffset
}
for i, pkgMetadata := range pkgMetadatas {
if startAt <= i {
numRoutines++
numFetched++
go func(i int, pkgMetadata *datapackagingv1alpha1.PackageMetadata) {
// fetch the associated packages
// Use the field selector to return only Package CRs that match on the spec.refName.
// TODO(agamez): perhaps we better fetch all the packages and filter ourselves to reduce the k8s calls
fieldSelector := fmt.Sprintf("spec.refName=%s", pkgMetadata.Name)
pkgs, err := s.getPkgsWithFieldSelector(ctx, cluster, namespace, fieldSelector)
if err != nil {
errs <- statuserror.FromK8sError("get", "Package", pkgMetadata.Name, err)
return
}
pkgVersionsMap, err := getPkgVersionsMap(pkgs)
if err != nil {
errs <- err
return
}
availablePackageSummary, err := s.fetchPackageSummaryForMeta(ctx, cluster, namespace, pkgMetadata)

// generate the availablePackageSummary from the fetched information
availablePackageSummary, err := s.buildAvailablePackageSummary(pkgMetadata, pkgVersionsMap, cluster)
if err != nil {
errs <- statuserror.FromK8sError("create", "AvailablePackageSummary", pkgMetadata.Name, err)
return
}

// append the availablePackageSummary to the slice
availablePackageSummaries[i] = availablePackageSummary
// Ensure we signal a completion.
errs <- nil
// The index of this result is relative to the page.
fetchResults <- fetchResult{i - startAt, availablePackageSummary, err}
}(i, pkgMetadata)
}
// if we've reached the end of the page, stop iterating
if pageSize > 0 && len(availablePackageSummaries) == int(pageSize) {
if pageSize > 0 && numFetched == int(pageSize) {
break
}
}
}
// Return an error if any is found. We continue only if there were no
// errors.
for i := 0; i < numRoutines; i++ {
err := <-errs
if err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected error while gathering available packages: %v", err))
}
}

// TODO(agamez): the slice with make is filled with <nil>, in case of an error in the
// i goroutine, the i-th <nil> stub will remain. Check if 'errgroup' works here, but I haven't
// been able so far.
// An alternative is using channels to perform a fine-grained control... but not sure if it worths
// However, should we just return an error if so? See https://github.com/kubeapps/kubeapps/pull/3784#discussion_r754836475
// filter out <nil> values
availablePackageSummariesNilSafe := []*corev1.AvailablePackageSummary{}
availablePackageSummaries := make([]*corev1.AvailablePackageSummary, numFetched)
categories := []string{}
for _, availablePackageSummary := range availablePackageSummaries {
if availablePackageSummary != nil {
availablePackageSummariesNilSafe = append(availablePackageSummariesNilSafe, availablePackageSummary)
categories = append(categories, availablePackageSummary.Categories...)

for i := 0; i < numFetched; i++ {
fetchResult := <-fetchResults
if fetchResult.err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected error while gathering available packages: %v", err))
}
// append the availablePackageSummary to the slice
availablePackageSummaries[fetchResult.index] = fetchResult.availablePackageSummary
categories = append(categories, fetchResult.availablePackageSummary.Categories...)
}

// Only return a next page token if the request was for pagination and
// the results are a full page.
nextPageToken := ""
if pageSize > 0 && len(availablePackageSummariesNilSafe) == int(pageSize) {
if pageSize > 0 && numFetched == int(pageSize) {
nextPageToken = fmt.Sprintf("%d", pageOffset+1)
}
response := &corev1.GetAvailablePackageSummariesResponse{
AvailablePackageSummaries: availablePackageSummariesNilSafe,
AvailablePackageSummaries: availablePackageSummaries,
Categories: categories,
NextPageToken: nextPageToken,
}
return response, nil
}

func (s *Server) fetchPackageSummaryForMeta(ctx context.Context, cluster, namespace string, pkgMetadata *datapackagingv1alpha1.PackageMetadata) (*corev1.AvailablePackageSummary, error) {
// fetch the associated packages
// Use the field selector to return only Package CRs that match on the spec.refName.
// TODO(agamez): perhaps we better fetch all the packages and filter ourselves to reduce the k8s calls
fieldSelector := fmt.Sprintf("spec.refName=%s", pkgMetadata.Name)
pkgs, err := s.getPkgsWithFieldSelector(ctx, cluster, namespace, fieldSelector)
if err != nil {
return nil, statuserror.FromK8sError("get", "Package", pkgMetadata.Name, err)
}
pkgVersionsMap, err := getPkgVersionsMap(pkgs)
if err != nil {
return nil, err
}

// generate the availablePackageSummary from the fetched information
availablePackageSummary, err := s.buildAvailablePackageSummary(pkgMetadata, pkgVersionsMap, cluster)
if err != nil {
return nil, statuserror.FromK8sError("create", "AvailablePackageSummary", pkgMetadata.Name, err)
}

return availablePackageSummary, nil
}

// GetAvailablePackageVersions returns the package versions managed by the 'kapp_controller' plugin
func (s *Server) GetAvailablePackageVersions(ctx context.Context, request *corev1.GetAvailablePackageVersionsRequest) (*corev1.GetAvailablePackageVersionsResponse, error) {
// Retrieve parameters from the request
Expand Down
Expand Up @@ -148,6 +148,7 @@ func TestGetAvailablePackageSummaries(t *testing.T) {
name string
existingObjects []runtime.Object
expectedPackages []*corev1.AvailablePackageSummary
paginationOptions corev1.PaginationOptions
expectedStatusCode codes.Code
}{
{
Expand Down Expand Up @@ -454,6 +455,109 @@ func TestGetAvailablePackageSummaries(t *testing.T) {
},
},
},
{
name: "it returns paginated carvel package summaries",
existingObjects: []runtime.Object{
&datapackagingv1alpha1.PackageMetadata{
TypeMeta: metav1.TypeMeta{
Kind: pkgMetadataResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tetris.foo.example.com",
},
Spec: datapackagingv1alpha1.PackageMetadataSpec{
DisplayName: "Classic Tetris",
IconSVGBase64: "Tm90IHJlYWxseSBTVkcK",
ShortDescription: "A great game for arcade gamers",
LongDescription: "A few sentences but not really a readme",
Categories: []string{"logging", "daemon-set"},
Maintainers: []datapackagingv1alpha1.Maintainer{{Name: "person1"}, {Name: "person2"}},
SupportDescription: "Some support information",
ProviderName: "Tetris inc.",
},
},
&datapackagingv1alpha1.PackageMetadata{
TypeMeta: metav1.TypeMeta{
Kind: pkgMetadataResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tombi.foo.example.com",
},
Spec: datapackagingv1alpha1.PackageMetadataSpec{
DisplayName: "Tombi!",
IconSVGBase64: "Tm90IHJlYWxseSBTVkcK",
ShortDescription: "An awesome game from the 90's",
LongDescription: "Tombi! is an open world platform-adventure game with RPG elements.",
Categories: []string{"platforms", "rpg"},
Maintainers: []datapackagingv1alpha1.Maintainer{{Name: "person1"}, {Name: "person2"}},
SupportDescription: "Some support information",
ProviderName: "Tombi!",
},
},
&datapackagingv1alpha1.Package{
TypeMeta: metav1.TypeMeta{
Kind: pkgResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tetris.foo.example.com.1.2.3",
},
Spec: datapackagingv1alpha1.PackageSpec{
RefName: "tetris.foo.example.com",
Version: "1.2.3",
Licenses: []string{"my-license"},
ReleaseNotes: "release notes",
CapactiyRequirementsDescription: "capacity description",
ReleasedAt: metav1.Time{time.Date(1984, time.June, 6, 0, 0, 0, 0, time.UTC)},
},
},
&datapackagingv1alpha1.Package{
TypeMeta: metav1.TypeMeta{
Kind: pkgResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tombi.foo.example.com.1.2.5",
},
Spec: datapackagingv1alpha1.PackageSpec{
RefName: "tombi.foo.example.com",
Version: "1.2.5",
Licenses: []string{"my-license"},
ReleaseNotes: "release notes",
CapactiyRequirementsDescription: "capacity description",
ReleasedAt: metav1.Time{time.Date(1997, time.December, 25, 0, 0, 0, 0, time.UTC)},
},
},
},
paginationOptions: corev1.PaginationOptions{
PageToken: "1",
PageSize: 1,
},
expectedPackages: []*corev1.AvailablePackageSummary{
{
AvailablePackageRef: &corev1.AvailablePackageReference{
Context: defaultContext,
Plugin: &pluginDetail,
Identifier: "tombi.foo.example.com",
},
Name: "tombi.foo.example.com",
DisplayName: "Tombi!",
LatestVersion: &corev1.PackageAppVersion{
PkgVersion: "1.2.5",
AppVersion: "1.2.5",
},
IconUrl: "data:image/svg+xml;base64,Tm90IHJlYWxseSBTVkcK",
ShortDescription: "An awesome game from the 90's",
Categories: []string{"platforms", "rpg"},
},
},
},
}

for _, tc := range testCases {
Expand All @@ -479,7 +583,10 @@ func TestGetAvailablePackageSummaries(t *testing.T) {
},
}

response, err := s.GetAvailablePackageSummaries(context.Background(), &corev1.GetAvailablePackageSummariesRequest{Context: defaultContext})
response, err := s.GetAvailablePackageSummaries(context.Background(), &corev1.GetAvailablePackageSummariesRequest{
Context: defaultContext,
PaginationOptions: &tc.paginationOptions,
})

if got, want := status.Code(err), tc.expectedStatusCode; got != want {
t.Fatalf("got: %d, want: %d, err: %+v", got, want, err)
Expand Down