Skip to content

Commit

Permalink
Ensure charts imported per namespace when using MongoDB. (#1551)
Browse files Browse the repository at this point in the history
* Ensure charts imported per namespace when using MongoDB.

* Include removal of old charts

* Update other calls in asset-syncer and start assetsvc.

* Ensure IRL works. Fix mock tests.

* Add index to files collection (takes import of stable from 14m -> 2m)
  • Loading branch information
absoludity committed Mar 5, 2020
1 parent 8b2861b commit 3a53925
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 67 deletions.
3 changes: 2 additions & 1 deletion chart/kubeapps/templates/apprepositories.yaml
Expand Up @@ -4,7 +4,8 @@ kind: AppRepository
metadata:
name: {{ .name }}
annotations:
"helm.sh/hook": pre-install
"helm.sh/hook": post-install
"helm.sh/hook-weight": "10"
labels:
app: {{ template "kubeapps.apprepository.fullname" $ }}
chart: {{ template "kubeapps.chart" $ }}
Expand Down
7 changes: 3 additions & 4 deletions chart/kubeapps/templates/apprepository-jobs-preupgrade.yaml
@@ -1,11 +1,11 @@
{{- if .Values.featureFlags.reposPerNamespace -}}
# Invalidate the chart cache during upgrade.
# Ensure db indexes are set and invalidate the chart cache during both install and upgrade.
apiVersion: batch/v1
kind: Job
metadata:
name: {{ template "kubeapps.apprepository-jobs-preupgrade.fullname" . }}
annotations:
helm.sh/hook: pre-upgrade
helm.sh/hook: pre-upgrade,post-install
helm.sh/hook-weight: "0"
helm.sh/hook-delete-policy: hook-succeeded
labels:
app: {{ template "kubeapps.apprepository-jobs-preupgrade.fullname" . }}
Expand Down Expand Up @@ -66,4 +66,3 @@ spec:
key: postgresql-password
name: {{ .Values.postgresql.existingSecret }}
{{- end }}
{{- end -}}
120 changes: 114 additions & 6 deletions cmd/asset-syncer/mongodb_db_test.go
Expand Up @@ -22,9 +22,13 @@ limitations under the License.
package main

import (
"errors"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/kubeapps/kubeapps/pkg/chart/models"
"github.com/kubeapps/kubeapps/pkg/dbutils"
"github.com/kubeapps/kubeapps/pkg/dbutils/dbutilstest/mgtest"
)

Expand All @@ -40,31 +44,135 @@ func TestMongoImportCharts(t *testing.T) {
Name: "repo-name",
Namespace: "repo-namespace",
}
repoSameNameOtherNamespace := models.Repo{
Name: "repo-name",
Namespace: "other-namespace",
}

testCases := []struct {
name string
charts []models.Chart
name string
existingCharts []models.Chart
charts []models.Chart
expectedCharts []models.Chart
expectedError error
}{
{
name: "it inserts the charts",
charts: []models.Chart{
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123"},
models.Chart{Name: "my-chart2", Repo: &repo, ID: "foo/bar:456"},
},
expectedCharts: []models.Chart{
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123"},
models.Chart{Name: "my-chart2", Repo: &repo, ID: "foo/bar:456"},
},
},
{
name: "it errors if asked to insert a chart in a different namespace",
charts: []models.Chart{
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123"},
models.Chart{Name: "my-chart2", Repo: &repo, ID: "foo/bar:456"},
models.Chart{Name: "my-chart1", Repo: &repoSameNameOtherNamespace, ID: "foo/bar:123"},
},
expectedError: ErrRepoMismatch,
},
{
name: "it updates existing charts in the chart namespace",
existingCharts: []models.Chart{
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123", Description: "Old description"},
},
charts: []models.Chart{
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123", Description: "New description"},
models.Chart{Name: "my-chart2", Repo: &repo, ID: "foo/bar:456"},
},
expectedCharts: []models.Chart{
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123", Description: "New description"},
models.Chart{Name: "my-chart2", Repo: &repo, ID: "foo/bar:456"},
},
},
{
name: "it removes charts that are not included in the import",
existingCharts: []models.Chart{
models.Chart{Name: "my-chart-old", Repo: &repo, ID: "foo/old:123"},
},
charts: []models.Chart{
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123", Description: "New description"},
models.Chart{Name: "my-chart2", Repo: &repo, ID: "foo/bar:456"},
},
expectedCharts: []models.Chart{
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123", Description: "New description"},
models.Chart{Name: "my-chart2", Repo: &repo, ID: "foo/bar:456"},
},
},
{
name: "it does not remove charts from other namespaces",
existingCharts: []models.Chart{
models.Chart{Name: "my-chart-old", Repo: &repoSameNameOtherNamespace, ID: "foo/other:123"},
},
charts: []models.Chart{
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123", Description: "New description"},
models.Chart{Name: "my-chart2", Repo: &repo, ID: "foo/bar:456"},
},
expectedCharts: []models.Chart{
models.Chart{Name: "my-chart-old", Repo: &repoSameNameOtherNamespace, ID: "foo/other:123"},
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123", Description: "New description"},
models.Chart{Name: "my-chart2", Repo: &repo, ID: "foo/bar:456"},
},
},
{
name: "it does not remove charts from other namespaces even if they have the same repo name",
existingCharts: []models.Chart{
models.Chart{Name: "my-chart-old", Repo: &repoSameNameOtherNamespace, ID: "foo/bar:123"},
},
charts: []models.Chart{
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123", Description: "New description"},
models.Chart{Name: "my-chart2", Repo: &repo, ID: "foo/bar:456"},
},
expectedCharts: []models.Chart{
models.Chart{Name: "my-chart-old", Repo: &repoSameNameOtherNamespace, ID: "foo/bar:123"},
models.Chart{Name: "my-chart1", Repo: &repo, ID: "foo/bar:123", Description: "New description"},
models.Chart{Name: "my-chart2", Repo: &repo, ID: "foo/bar:456"},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
manager, cleanup := getInitializedMongoManager(t)
defer cleanup()
if len(tc.existingCharts) > 0 {
err := manager.importCharts(tc.existingCharts, *tc.existingCharts[0].Repo)
if err != nil {
t.Fatalf("%+v", err)
}
}

err := manager.importCharts(tc.charts)
if err != nil {
t.Errorf("%+v", err)
err := manager.importCharts(tc.charts, repo)
if tc.expectedError != nil {
if got, want := err, tc.expectedError; !errors.Is(got, want) {
t.Fatalf("got: %+v, want: %+v", got, want)
}
} else if err != nil {
t.Fatalf("%+v", err)
}

// TODO: Add actual assertions on remaining charts etc.
opts := cmpopts.EquateEmpty()
if got, want := getAllCharts(t, manager), tc.expectedCharts; !cmp.Equal(want, got, opts) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opts))
}
})
}
}

func getAllCharts(t *testing.T, manager *mongodbAssetManager) []models.Chart {
var result []models.Chart
db, closer := manager.DBSession.DB()
defer closer()

coll := db.C(dbutils.ChartCollection)
err := coll.Find(nil).Sort("repo.name", "repo.namespace", "id").All(&result)
if err != nil {
t.Fatalf("%+v", err)
}
return result
}
57 changes: 29 additions & 28 deletions cmd/asset-syncer/mongodb_utils.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"fmt"
"time"

"github.com/globalsign/mgo/bson"
Expand All @@ -25,11 +26,7 @@ import (
"github.com/kubeapps/kubeapps/pkg/dbutils"
)

const (
chartCollection = "charts"
repositoryCollection = "repos"
chartFilesCollection = "files"
)
var ErrRepoMismatch = fmt.Errorf("chart repository did not match import repository")

type mongodbAssetManager struct {
*dbutils.MongodbAssetManager
Expand All @@ -50,69 +47,77 @@ func newMongoDBManager(config datastore.Config) assetManager {
// imported into the database as fast as possible. E.g. we want all icons for
// charts before fetching readmes for each chart and version pair.
func (m *mongodbAssetManager) Sync(repo models.Repo, charts []models.Chart) error {
return m.importCharts(charts)
return m.importCharts(charts, repo)
}

func (m *mongodbAssetManager) RepoAlreadyProcessed(repo models.Repo, checksum string) bool {
db, closer := m.DBSession.DB()
defer closer()
lastCheck := &models.RepoCheck{}
err := db.C(repositoryCollection).Find(bson.M{"_id": repo.Name}).One(lastCheck)
err := db.C(dbutils.RepositoryCollection).Find(bson.M{"name": repo.Name, "namespace": repo.Namespace}).One(lastCheck)
return err == nil && checksum == lastCheck.Checksum
}

func (m *mongodbAssetManager) UpdateLastCheck(repoNamespace, repoName, checksum string, now time.Time) error {
db, closer := m.DBSession.DB()
defer closer()
_, err := db.C(repositoryCollection).UpsertId(repoName, bson.M{"$set": bson.M{"last_update": now, "checksum": checksum}})
_, err := db.C(dbutils.RepositoryCollection).Upsert(bson.M{"name": repoName, "namespace": repoNamespace}, bson.M{"$set": bson.M{"last_update": now, "checksum": checksum}})
return err
}

func (m *mongodbAssetManager) Delete(repo models.Repo) error {
db, closer := m.DBSession.DB()
defer closer()
_, err := db.C(chartCollection).RemoveAll(bson.M{
"repo.name": repo.Name,
_, err := db.C(dbutils.ChartCollection).RemoveAll(bson.M{
"repo.name": repo.Name,
"repo.namespace": repo.Namespace,
})
if err != nil {
return err
}

_, err = db.C(chartFilesCollection).RemoveAll(bson.M{
"repo.name": repo.Name,
_, err = db.C(dbutils.ChartFilesCollection).RemoveAll(bson.M{
"repo.name": repo.Name,
"repo.namespace": repo.Namespace,
})
if err != nil {
return err
}

_, err = db.C(repositoryCollection).RemoveAll(bson.M{
"_id": repo.Name,
_, err = db.C(dbutils.RepositoryCollection).RemoveAll(bson.M{
"name": repo.Name,
"namespace": repo.Namespace,
})
return err
}

func (m *mongodbAssetManager) importCharts(charts []models.Chart) error {
func (m *mongodbAssetManager) importCharts(charts []models.Chart, repo models.Repo) error {
var pairs []interface{}
var chartIDs []string
for _, c := range charts {
if c.Repo == nil || c.Repo.Namespace != repo.Namespace || c.Repo.Name != repo.Name {
return fmt.Errorf("%w: chart repo: %+v, import repo: %+v", ErrRepoMismatch, c.Repo, repo)
}
chartIDs = append(chartIDs, c.ID)
// charts to upsert - pair of selector, chart
pairs = append(pairs, bson.M{"_id": c.ID}, c)
// Mongodb generates the unique _id, we rely on the compound unique index on chart_id and repo.
pairs = append(pairs, bson.M{"chart_id": c.ID, "repo.name": repo.Name, "repo.namespace": repo.Namespace}, bson.M{"$set": c})
}

db, closer := m.DBSession.DB()
defer closer()
bulk := db.C(chartCollection).Bulk()
bulk := db.C(dbutils.ChartCollection).Bulk()

// Upsert pairs of selectors, charts
bulk.Upsert(pairs...)

// Remove charts no longer existing in index
bulk.RemoveAll(bson.M{
"_id": bson.M{
"chart_id": bson.M{
"$nin": chartIDs,
},
"repo.name": charts[0].Repo.Name,
"repo.name": repo.Name,
"repo.namespace": repo.Namespace,
})

_, err := bulk.Run()
Expand All @@ -122,25 +127,21 @@ func (m *mongodbAssetManager) importCharts(charts []models.Chart) error {
func (m *mongodbAssetManager) updateIcon(repo models.Repo, data []byte, contentType, ID string) error {
db, closer := m.DBSession.DB()
defer closer()
return db.C(chartCollection).UpdateId(ID, bson.M{"$set": bson.M{"raw_icon": data, "icon_content_type": contentType}})
_, err := db.C(dbutils.ChartCollection).Upsert(bson.M{"chart_id": ID, "repo.name": repo.Name, "repo.namespace": repo.Namespace}, bson.M{"$set": bson.M{"raw_icon": data, "icon_content_type": contentType}})
return err
}

func (m *mongodbAssetManager) filesExist(repo models.Repo, chartFilesID, digest string) bool {
db, closer := m.DBSession.DB()
defer closer()
err := db.C(chartFilesCollection).Find(bson.M{"_id": chartFilesID, "digest": digest}).One(&models.ChartFiles{})
err := db.C(dbutils.ChartFilesCollection).Find(bson.M{"file_id": chartFilesID, "repo.name": repo.Name, "repo.namespace": repo.Namespace, "digest": digest}).One(&models.ChartFiles{})
return err == nil
}

func (m *mongodbAssetManager) insertFiles(chartId string, files models.ChartFiles) error {
db, closer := m.DBSession.DB()
defer closer()
_, err := db.C(chartFilesCollection).UpsertId(files.ID, files)
return err
}

// InvalidateCache for mongodb currently is a noop to fulfil the interface.
func (m *mongodbAssetManager) InvalidateCache() error {
// TODO: implement a cache invalidation
return nil
_, err := db.C(dbutils.ChartFilesCollection).Upsert(bson.M{"file_id": files.ID, "repo.name": files.Repo.Name, "repo.namespace": files.Repo.Namespace}, files)
return err
}
26 changes: 18 additions & 8 deletions cmd/asset-syncer/mongodb_utils_test.go
Expand Up @@ -42,9 +42,14 @@ func Test_importCharts(t *testing.T) {
m.On("Upsert", mock.Anything)
m.On("RemoveAll", mock.Anything)
index, _ := parseRepoIndex([]byte(validRepoIndexYAML))
charts := chartsFromIndex(index, &models.Repo{Name: "test", URL: "http://testrepo.com"})
repo := models.Repo{
Name: "repo-name",
Namespace: "repo-namespace",
URL: "http://testrepo.example.com",
}
charts := chartsFromIndex(index, &repo)
manager := getMockManager(m)
manager.importCharts(charts)
manager.importCharts(charts, repo)

m.AssertExpectations(t)
// The Bulk Upsert method takes an array that consists of a selector followed by an interface to upsert.
Expand All @@ -53,21 +58,25 @@ func Test_importCharts(t *testing.T) {
args := m.Calls[0].Arguments.Get(0).([]interface{})
assert.Equal(t, len(args), len(charts)*2, "number of selector, chart pairs to upsert")
for i := 0; i < len(args); i += 2 {
c := args[i+1].(models.Chart)
assert.Equal(t, args[i], bson.M{"_id": "test/" + c.Name}, "selector")
m := args[i+1].(bson.M)
c := m["$set"].(models.Chart)
assert.Equal(t, args[i], bson.M{"chart_id": "repo-name/" + c.Name, "repo.name": "repo-name", "repo.namespace": "repo-namespace"}, "selector")
}
}

func Test_DeleteRepo(t *testing.T) {
m := &mock.Mock{}
repo := models.Repo{Name: "repo-name", Namespace: "repo-namespace"}
m.On("RemoveAll", bson.M{
"repo.name": "test",
"repo.name": repo.Name,
"repo.namespace": repo.Namespace,
})
m.On("RemoveAll", bson.M{
"_id": "test",
"name": repo.Name,
"namespace": repo.Namespace,
})
manager := getMockManager(m)
err := manager.Delete(models.Repo{Name: "test"})
err := manager.Delete(repo)
if err != nil {
t.Errorf("failed to delete chart repo test: %v", err)
}
Expand Down Expand Up @@ -117,9 +126,10 @@ func Test_updateLastCheck(t *testing.T) {
checksum = "bar"
)
now := time.Now()
m.On("UpsertId", repoName, bson.M{"$set": bson.M{"last_update": now, "checksum": checksum}}).Return(nil)
m.On("Upsert", bson.M{"name": repoName, "namespace": repoNamespace}, bson.M{"$set": bson.M{"last_update": now, "checksum": checksum}}).Return(nil)
manager := getMockManager(m)
err := manager.UpdateLastCheck(repoNamespace, repoName, checksum, now)
m.AssertExpectations(t)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
Expand Down

0 comments on commit 3a53925

Please sign in to comment.