From f60dfafa1b072c0504349e77f8a9abf89f195f09 Mon Sep 17 00:00:00 2001 From: Ashutosh Narkar Date: Wed, 4 May 2022 15:58:32 -0700 Subject: [PATCH] Initial support for large bundle deployments Currently bundles are loaded into memory entirely even when disk storage is used. Then the parsed content is written to the store. Deserializing data into Go structs is memory consuming and even if user has configured disk storage, OPA is still bound by the amount of memory assigned to it. This change adds a new lazy loading mode wherein the entire data is not deserialized while bundle reading and hence if the bundle contains large data files and the user has enabled disk storage, OPA should be able to handle this scenario w/o running OOM. Fixes: #4539 Signed-off-by: Ashutosh Narkar --- bundle/bundle.go | 70 +- bundle/bundle_test.go | 28 + bundle/file.go | 75 ++ bundle/file_test.go | 132 +++- bundle/store.go | 193 ++++- bundle/store_test.go | 1346 ++++++++++++++++++++++++++++++--- docs/content/misc-disk.md | 11 - download/download.go | 26 +- download/download_test.go | 41 + internal/storage/mock/mock.go | 4 + plugins/bundle/plugin.go | 3 +- plugins/bundle/plugin_test.go | 291 ++++++- server/server_test.go | 4 + storage/disk/config.go | 17 +- storage/disk/config_test.go | 2 +- storage/disk/disk.go | 255 ++++++- storage/disk/disk_test.go | 203 +++++ storage/disk/txn.go | 79 +- storage/inmem/inmem.go | 68 ++ storage/inmem/inmem_test.go | 134 ++++ storage/interface.go | 18 + topdown/topdown_test.go | 8 + 22 files changed, 2834 insertions(+), 174 deletions(-) diff --git a/bundle/bundle.go b/bundle/bundle.go index cfa342df77..4eb2a519c4 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -54,6 +54,16 @@ type Bundle struct { PlanModules []PlanModuleFile Patch Patch Etag string + Raw []Raw + + lazyLoadingMode bool + sizeLimitBytes int64 +} + +// Raw contains raw bytes representing the bundle's content +type Raw struct { + Path string + Value []byte } // Patch contains an array of objects wherein each object represents the patch operation to be @@ -286,6 +296,10 @@ func (m *Manifest) validateAndInjectDefaults(b Bundle) error { } } + if b.lazyLoadingMode { + return nil + } + // Validate data in bundle. return dfs(b.Data, "", func(path string, node interface{}) (bool, error) { path = strings.Trim(path, "/") @@ -343,6 +357,8 @@ type Reader struct { files map[string]FileInfo // files in the bundle signature payload sizeLimitBytes int64 etag string + lazyLoadingMode bool + name string } // NewReader is deprecated. Use NewCustomReader instead. @@ -413,18 +429,37 @@ func (r *Reader) WithBundleEtag(etag string) *Reader { return r } +// WithBundleName specifies the bundle name +func (r *Reader) WithBundleName(name string) *Reader { + r.name = name + return r +} + +// WithLazyLoadingMode sets the bundle loading mode. If true, +// bundles will be read in lazy mode. In this mode, data files in the bundle will not be +// deserialized and the check to validate that the bundle data does not contain paths +// outside the bundle's roots will not be performed while reading the bundle. +func (r *Reader) WithLazyLoadingMode(yes bool) *Reader { + r.lazyLoadingMode = yes + return r +} + // Read returns a new Bundle loaded from the reader. func (r *Reader) Read() (Bundle, error) { var bundle Bundle var descriptors []*Descriptor var err error + var raw []Raw bundle.Signatures, bundle.Patch, descriptors, err = preProcessBundle(r.loader, r.skipVerify, r.sizeLimitBytes) if err != nil { return bundle, err } + bundle.lazyLoadingMode = r.lazyLoadingMode + bundle.sizeLimitBytes = r.sizeLimitBytes + if bundle.Type() == SnapshotBundleType { err = r.checkSignaturesAndDescriptors(bundle.Signatures) if err != nil { @@ -463,6 +498,17 @@ func (r *Reader) Read() (Bundle, error) { if strings.HasSuffix(path, RegoExt) { fullPath := r.fullPath(path) + bs := buf.Bytes() + + if r.lazyLoadingMode { + p := fullPath + if r.name != "" { + p = modulePathWithPrefix(r.name, fullPath) + } + + raw = append(raw, Raw{Path: p, Value: bs}) + } + r.metrics.Timer(metrics.RegoModuleParse).Start() module, err := ast.ParseModuleWithOpts(fullPath, buf.String(), ast.ParserOptions{ProcessAnnotation: r.processAnnotations}) r.metrics.Timer(metrics.RegoModuleParse).Stop() @@ -473,11 +519,10 @@ func (r *Reader) Read() (Bundle, error) { mf := ModuleFile{ URL: f.URL(), Path: fullPath, - Raw: buf.Bytes(), + Raw: bs, Parsed: module, } bundle.Modules = append(bundle.Modules, mf) - } else if filepath.Base(path) == WasmFile { bundle.WasmModules = append(bundle.WasmModules, WasmModuleFile{ URL: f.URL(), @@ -491,6 +536,11 @@ func (r *Reader) Read() (Bundle, error) { Raw: buf.Bytes(), }) } else if filepath.Base(path) == dataFile { + if r.lazyLoadingMode { + raw = append(raw, Raw{Path: path, Value: buf.Bytes()}) + continue + } + var value interface{} r.metrics.Timer(metrics.RegoDataParse).Start() @@ -506,6 +556,10 @@ func (r *Reader) Read() (Bundle, error) { } } else if filepath.Base(path) == yamlDataFile { + if r.lazyLoadingMode { + raw = append(raw, Raw{Path: path, Value: buf.Bytes()}) + continue + } var value interface{} @@ -592,6 +646,7 @@ func (r *Reader) Read() (Bundle, error) { } bundle.Etag = r.etag + bundle.Raw = raw return bundle, nil } @@ -1200,7 +1255,13 @@ func rootContains(root []string, other []string) bool { } func insertValue(b *Bundle, path string, value interface{}) error { + if err := b.insertData(getNormalizedPath(path), value); err != nil { + return fmt.Errorf("bundle load failed on %v: %w", path, err) + } + return nil +} +func getNormalizedPath(path string) []string { // Remove leading / and . characters from the directory path. If the bundle // was written with OPA then the paths will contain a leading slash. On the // other hand, if the path is empty, filepath.Dir will return '.'. @@ -1211,10 +1272,7 @@ func insertValue(b *Bundle, path string, value interface{}) error { if dirpath != "" { key = strings.Split(dirpath, "/") } - if err := b.insertData(key, value); err != nil { - return fmt.Errorf("bundle load failed on %v: %w", path, err) - } - return nil + return key } func dfs(value interface{}, path string, fn func(string, interface{}) (bool, error)) error { diff --git a/bundle/bundle_test.go b/bundle/bundle_test.go index 8282bd94fa..e92ab33a8f 100644 --- a/bundle/bundle_test.go +++ b/bundle/bundle_test.go @@ -118,6 +118,34 @@ func TestReadWithSizeLimit(t *testing.T) { } } +func TestReadBundleInLazyMode(t *testing.T) { + files := [][2]string{ + {"/a/b/c/data.json", "[1,2,3]"}, + {"/a/b/d/data.json", "true"}, + {"/a/b/y/data.yaml", `foo: 1`}, + {"/example/example.rego", `package example`}, + {"/data.json", `{"x": {"y": true}, "a": {"b": {"z": true}}}}`}, + {"/.manifest", `{"revision": "foo", "roots": ["example"]}`}, // data is outside roots but validation skipped in lazy mode + } + + buf := archive.MustWriteTarGz(files) + loader := NewTarballLoaderWithBaseURL(buf, "") + br := NewCustomReader(loader).WithLazyLoadingMode(true) + + bundle, err := br.Read() + if err != nil { + t.Fatal(err) + } + + if len(bundle.Data) != 0 { + t.Fatal("expected the bundle object to contain no data") + } + + if len(bundle.Raw) == 0 { + t.Fatal("raw bundle bytes not set on bundle object") + } +} + func TestReadWithBundleEtag(t *testing.T) { files := [][2]string{ diff --git a/bundle/file.go b/bundle/file.go index d5d26c408d..06af178826 100644 --- a/bundle/file.go +++ b/bundle/file.go @@ -9,8 +9,11 @@ import ( "os" "path" "path/filepath" + "sort" "strings" "sync" + + "github.com/open-policy-agent/opa/storage" ) // Descriptor contains information about a file and @@ -192,6 +195,8 @@ type tarballLoader struct { type file struct { name string reader io.Reader + path storage.Path + raw []byte } // NewTarballLoader is deprecated. Use NewTarballLoaderWithBaseURL instead. @@ -265,3 +270,73 @@ func (t *tarballLoader) NextFile() (*Descriptor, error) { return newDescriptor(path.Join(t.baseURL, f.name), f.name, f.reader), nil } + +// Next implements the storage.Iterator interface. +// It iterates to the next policy or data file in the directory tree +// and returns a storage.Update for the file. +func (it *iterator) Next() (*storage.Update, error) { + + if it.files == nil { + it.files = []file{} + + for _, item := range it.raw { + f := file{name: item.Path} + + fpath := strings.TrimLeft(filepath.ToSlash(filepath.Dir(f.name)), "/.") + if strings.HasSuffix(f.name, RegoExt) { + fpath = strings.Trim(f.name, "/") + } + + p, ok := storage.ParsePathEscaped("/" + fpath) + if !ok { + return nil, fmt.Errorf("storage path invalid: %v", f.name) + } + f.path = p + + f.raw = item.Value + + it.files = append(it.files, f) + } + + sortFilePathAscend(it.files) + } + + // If done reading files then just return io.EOF + // errors for each NextFile() call + if it.idx >= len(it.files) { + return nil, io.EOF + } + + f := it.files[it.idx] + it.idx++ + + isPolicy := false + if strings.HasSuffix(f.name, RegoExt) { + isPolicy = true + } + + return &storage.Update{ + Path: f.path, + Value: f.raw, + IsPolicy: isPolicy, + }, nil +} + +type iterator struct { + raw []Raw + files []file + idx int +} + +func NewIterator(raw []Raw) storage.Iterator { + it := iterator{ + raw: raw, + } + return &it +} + +func sortFilePathAscend(files []file) { + sort.Slice(files, func(i, j int) bool { + return len(files[i].path) < len(files[j].path) + }) +} diff --git a/bundle/file_test.go b/bundle/file_test.go index e170faae4e..c462e19371 100644 --- a/bundle/file_test.go +++ b/bundle/file_test.go @@ -5,6 +5,7 @@ import ( "io" "os" "path/filepath" + "reflect" "strings" "testing" @@ -19,8 +20,11 @@ var archiveFiles = map[string]string{ "/a.json": "a", "/a/b.json": "b", "/a/b/c.json": "c", + "/a/b/d/data.json": "hello", + "/a/c/data.yaml": "12", "/some.txt": "text", "/policy.rego": "package foo\n p = 1", + "/roles/policy.rego": "package bar\n p = 1", "/deeper/dir/path/than/others/foo": "bar", } @@ -32,34 +36,100 @@ func TestTarballLoader(t *testing.T) { test.WithTempFS(files, func(rootDir string) { tarballFile := filepath.Join(rootDir, "archive.tar.gz") - f, err := os.Create(tarballFile) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } + f := testGetTarballFile(t, rootDir) - var gzFiles [][2]string - for name, content := range archiveFiles { - gzFiles = append(gzFiles, [2]string{name, content}) - } + loader := NewTarballLoaderWithBaseURL(f, tarballFile) + + defer f.Close() + + testLoader(t, loader, tarballFile, archiveFiles) + }) +} + +func TestIterator(t *testing.T) { + + var files [][2]string + for name, content := range archiveFiles { + files = append(files, [2]string{name, content}) + } + + buf := archive.MustWriteTarGz(files) + bundle, err := NewReader(buf).WithLazyLoadingMode(true).Read() + if err != nil { + t.Fatal(err) + } - _, err = f.Write(archive.MustWriteTarGz(gzFiles).Bytes()) - if err != nil { + iterator := NewIterator(bundle.Raw) + fileCount := 0 + for { + _, err := iterator.Next() + if err != nil && err != io.EOF { t.Fatalf("Unexpected error: %s", err) + } else if err == io.EOF { + break } - f.Close() + fileCount++ + } + + expCount := 4 + if fileCount != expCount { + t.Fatalf("Expected to read %d files, read %d", expCount, fileCount) + } +} + +func TestIteratorOrder(t *testing.T) { + + var archFiles = map[string]string{ + "/a/b/c/data.json": "[1,2,3]", + "/a/b/d/e/data.json": `e: true`, + "/data.json": `{"x": {"y": true}, "a": {"b": {"z": true}}}}`, + "/a/b/y/x/z/data.yaml": `foo: 1`, + "/a/b/data.json": "[4,5,6]", + "/a/data.json": "hello", + "/policy.rego": "package foo\n p = 1", + "/roles/policy.rego": "package bar\n p = 1", + } + + var files [][2]string + for name, content := range archFiles { + files = append(files, [2]string{name, content}) + } - f, err = os.Open(tarballFile) - if err != nil { + buf := archive.MustWriteTarGz(files) + bundle, err := NewReader(buf).WithLazyLoadingMode(true).Read() + if err != nil { + t.Fatal(err) + } + + iterator := NewIterator(bundle.Raw) + + fileCount := 0 + actualDataFiles := []string{} + + for { + i, err := iterator.Next() + if err != nil && err != io.EOF { t.Fatalf("Unexpected error: %s", err) + } else if err == io.EOF { + break } + fileCount++ - loader := NewTarballLoaderWithBaseURL(f, tarballFile) - defer f.Close() + if !strings.HasSuffix(i.Path.String(), RegoExt) { + actualDataFiles = append(actualDataFiles, i.Path.String()) + } + } - testLoader(t, loader, tarballFile, archiveFiles) + expCount := 8 + if fileCount != expCount { + t.Fatalf("Expected to read %d files, read %d", expCount, fileCount) + } - }) + expDataFiles := []string{"/", "/a", "/a/b", "/a/b/c", "/a/b/d/e", "/a/b/y/x/z"} + if !reflect.DeepEqual(expDataFiles, actualDataFiles) { + t.Fatalf("Expected data files %v but got %v", expDataFiles, actualDataFiles) + } } func TestDirectoryLoader(t *testing.T) { @@ -70,6 +140,34 @@ func TestDirectoryLoader(t *testing.T) { }) } +func testGetTarballFile(t *testing.T, root string) *os.File { + t.Helper() + + tarballFile := filepath.Join(root, "archive.tar.gz") + f, err := os.Create(tarballFile) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + var gzFiles [][2]string + for name, content := range archiveFiles { + gzFiles = append(gzFiles, [2]string{name, content}) + } + + _, err = f.Write(archive.MustWriteTarGz(gzFiles).Bytes()) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + f.Close() + + f, err = os.Open(tarballFile) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + return f +} + func testLoader(t *testing.T, loader DirectoryLoader, baseURL string, expectedFiles map[string]string) { t.Helper() diff --git a/bundle/store.go b/bundle/store.go index 2a2361d494..247617df57 100644 --- a/bundle/store.go +++ b/bundle/store.go @@ -9,6 +9,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "path/filepath" "strings" "github.com/open-policy-agent/opa/ast" @@ -384,16 +385,67 @@ func activateBundles(opts *ActivateOpts) error { return err } + // Validate data in bundle does not contain paths outside the bundle's roots. for _, b := range snapshotBundles { - // Write data from each new bundle into the store. Only write under the - // roots contained in their manifest. This should be done *before* the - // policies so that path conflict checks can occur. - if err := writeData(opts.Ctx, opts.Store, opts.Txn, *b.Manifest.Roots, b.Data); err != nil { - return err + + if b.lazyLoadingMode { + + if len(b.Raw) == 0 { + return fmt.Errorf("raw bundle bytes not set on bundle object") + } + + for _, item := range b.Raw { + path := filepath.ToSlash(item.Path) + + if filepath.Base(path) == dataFile || filepath.Base(path) == yamlDataFile { + var val map[string]json.RawMessage + err = util.Unmarshal(item.Value, &val) + if err == nil { + err = doDFS(val, filepath.Dir(strings.Trim(path, "/")), *b.Manifest.Roots) + if err != nil { + return err + } + } else { + // Build an object for the value + p := getNormalizedPath(path) + + if len(p) == 0 { + return fmt.Errorf("root value must be object") + } + + // verify valid YAML or JSON value + var x interface{} + err := util.Unmarshal(item.Value, &x) + if err != nil { + return err + } + + value := item.Value + dir := map[string]json.RawMessage{} + for i := len(p) - 1; i > 0; i-- { + dir[p[i]] = value + + bs, err := json.Marshal(dir) + if err != nil { + return err + } + + value = bs + dir = map[string]json.RawMessage{} + } + dir[p[0]] = value + + err = doDFS(dir, filepath.Dir(strings.Trim(path, "/")), *b.Manifest.Roots) + if err != nil { + return err + } + } + } + } } } - // Write and compile the modules all at once to avoid having to re-do work. + // Compile the modules all at once to avoid having to re-do work. remainingAndExtra := make(map[string]*ast.Module) for name, mod := range remaining { remainingAndExtra[name] = mod @@ -402,11 +454,19 @@ func activateBundles(opts *ActivateOpts) error { remainingAndExtra[name] = mod } - err = writeModules(opts.Ctx, opts.Store, opts.Txn, opts.Compiler, opts.Metrics, snapshotBundles, remainingAndExtra, opts.legacy) + err = compileModules(opts.Compiler, opts.Metrics, snapshotBundles, remainingAndExtra, opts.legacy) if err != nil { return err } + if err := writeDataAndModules(opts.Ctx, opts.Store, opts.Txn, opts.TxnCtx, snapshotBundles, opts.legacy); err != nil { + return err + } + + if err := ast.CheckPathConflicts(opts.Compiler, storage.NonEmpty(opts.Ctx, opts.Store, opts.Txn)); len(err) > 0 { + return err + } + for name, b := range snapshotBundles { if err := writeManifestToStore(opts, name, b.Manifest); err != nil { return err @@ -424,6 +484,49 @@ func activateBundles(opts *ActivateOpts) error { return nil } +func doDFS(obj map[string]json.RawMessage, path string, roots []string) error { + if len(roots) == 1 && roots[0] == "" { + return nil + } + + for key := range obj { + + newPath := filepath.Join(strings.Trim(path, "/"), key) + + contains := false + prefix := false + if RootPathsContain(roots, newPath) { + contains = true + } else { + for i := range roots { + if strings.HasPrefix(strings.Trim(roots[i], "/"), newPath) { + prefix = true + break + } + } + } + + if !contains && !prefix { + return fmt.Errorf("manifest roots %v do not permit data at path '/%s' (hint: check bundle directory structure)", roots, newPath) + } + + if contains { + continue + } + + var next map[string]json.RawMessage + err := util.Unmarshal(obj[key], &next) + if err != nil { + return fmt.Errorf("manifest roots %v do not permit data at path '/%s' (hint: check bundle directory structure)", roots, newPath) + } + + if err := doDFS(next, newPath, roots); err != nil { + return err + } + } + return nil +} + func activateDeltaBundles(opts *ActivateOpts, bundles map[string]*Bundle) error { // Check that the manifest roots and wasm resolvers in the delta bundle @@ -518,6 +621,7 @@ func eraseData(ctx context.Context, store storage.Store, txn storage.Transaction if !ok { return fmt.Errorf("manifest root path invalid: %v", root) } + if len(path) > 0 { if err := store.Write(ctx, txn, storage.RemoveOp, path, nil); suppressNotFound(err) != nil { return err @@ -591,6 +695,44 @@ func writeEtagToStore(opts *ActivateOpts, name, etag string) error { return nil } +func writeDataAndModules(ctx context.Context, store storage.Store, txn storage.Transaction, txnCtx *storage.Context, bundles map[string]*Bundle, legacy bool) error { + params := storage.WriteParams + params.Context = txnCtx + + for name, b := range bundles { + if len(b.Raw) == 0 { + // Write data from each new bundle into the store. Only write under the + // roots contained in their manifest. + if err := writeData(ctx, store, txn, *b.Manifest.Roots, b.Data); err != nil { + return err + } + + for _, mf := range b.Modules { + var path string + + // For backwards compatibility, in legacy mode, upsert policies to + // the unprefixed path. + if legacy { + path = mf.Path + } else { + path = modulePathWithPrefix(name, mf.Path) + } + + if err := store.UpsertPolicy(ctx, txn, path, mf.Raw); err != nil { + return err + } + } + } else { + err := store.Truncate(ctx, txn, params, NewIterator(b.Raw)) + if err != nil { + return fmt.Errorf("store truncate failed for bundle '%s': %v", name, err) + } + } + } + + return nil +} + func writeData(ctx context.Context, store storage.Store, txn storage.Transaction, roots []string, data map[string]interface{}) error { for _, root := range roots { path, ok := storage.ParsePathEscaped("/" + root) @@ -611,6 +753,43 @@ func writeData(ctx context.Context, store storage.Store, txn storage.Transaction return nil } +func compileModules(compiler *ast.Compiler, m metrics.Metrics, bundles map[string]*Bundle, extraModules map[string]*ast.Module, legacy bool) error { + + m.Timer(metrics.RegoModuleCompile).Start() + defer m.Timer(metrics.RegoModuleCompile).Stop() + + modules := map[string]*ast.Module{} + + // preserve any modules already on the compiler + for name, module := range compiler.Modules { + modules[name] = module + } + + // preserve any modules passed in from the store + for name, module := range extraModules { + modules[name] = module + } + + // include all the new bundle modules + for bundleName, b := range bundles { + if legacy { + for _, mf := range b.Modules { + modules[mf.Path] = mf.Parsed + } + } else { + for name, module := range b.ParsedModules(bundleName) { + modules[name] = module + } + } + } + + if compiler.Compile(modules); compiler.Failed() { + return compiler.Errors + } + + return nil +} + func writeModules(ctx context.Context, store storage.Store, txn storage.Transaction, compiler *ast.Compiler, m metrics.Metrics, bundles map[string]*Bundle, extraModules map[string]*ast.Module, legacy bool) error { m.Timer(metrics.RegoModuleCompile).Start() diff --git a/bundle/store_test.go b/bundle/store_test.go index 39a07adf3a..dce93641d8 100644 --- a/bundle/store_test.go +++ b/bundle/store_test.go @@ -1,12 +1,19 @@ package bundle import ( + "bytes" "context" + "encoding/json" + "fmt" "path/filepath" "reflect" "strings" "testing" + "github.com/open-policy-agent/opa/internal/file/archive" + "github.com/open-policy-agent/opa/logging" + "github.com/open-policy-agent/opa/util/test" + "github.com/open-policy-agent/opa/util" "github.com/open-policy-agent/opa/ast" @@ -15,6 +22,7 @@ import ( "github.com/open-policy-agent/opa/internal/storage/mock" "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/storage/inmem" ) @@ -75,141 +83,1142 @@ func TestLegacyManifestStoreLifecycle(t *testing.T) { t.Fatalf("Unexpected error finishing transaction: %s", err) } - // make sure it can be retrieved - verifyReadLegacyRevision(ctx, t, store, tb.Revision) + // make sure it can be retrieved + verifyReadLegacyRevision(ctx, t, store, tb.Revision) + + // delete it + err = storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { + if err := LegacyEraseManifestFromStore(ctx, store, txn); err != nil { + t.Fatalf("Failed to erase manifest from store: %s", err) + return err + } + return nil + }) + if err != nil { + t.Fatalf("Unexpected error finishing transaction: %s", err) + } + + verifyReadLegacyRevision(ctx, t, store, "") +} + +func TestMixedManifestStoreLifecycle(t *testing.T) { + store := inmem.New() + ctx := context.Background() + bundles := map[string]Manifest{ + "bundle1": { + Revision: "abc123", + Roots: &[]string{"/a/b", "/a/c"}, + }, + "bundle2": { + Revision: "def123", + Roots: &[]string{"/x/y", "/z"}, + }, + } + + // Write the legacy one first + err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { + if err := LegacyWriteManifestToStore(ctx, store, txn, bundles["bundle1"]); err != nil { + t.Fatalf("Failed to write manifest to store: %s", err) + return err + } + return nil + }) + if err != nil { + t.Fatalf("Unexpected error finishing transaction: %s", err) + } + + verifyReadBundleNames(ctx, t, store, []string{}) + + // Write both new ones + verifyWriteManifests(ctx, t, store, bundles) + verifyReadBundleNames(ctx, t, store, []string{"bundle1", "bundle2"}) + + // Ensure the original legacy one is still there + verifyReadLegacyRevision(ctx, t, store, bundles["bundle1"].Revision) +} + +func verifyDeleteManifest(ctx context.Context, t *testing.T, store storage.Store, name string) { + err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { + err := EraseManifestFromStore(ctx, store, txn, name) + if err != nil { + t.Fatalf("Failed to delete manifest from store: %s", err) + } + return err + }) + if err != nil { + t.Fatalf("Unexpected error finishing transaction: %s", err) + } +} + +func verifyWriteManifests(ctx context.Context, t *testing.T, store storage.Store, bundles map[string]Manifest) { + t.Helper() + for name, manifest := range bundles { + err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { + err := WriteManifestToStore(ctx, store, txn, name, manifest) + if err != nil { + t.Fatalf("Failed to write manifest to store: %s", err) + } + return err + }) + if err != nil { + t.Fatalf("Unexpected error finishing transaction: %s", err) + } + } +} + +func verifyReadBundleNames(ctx context.Context, t *testing.T, store storage.Store, expected []string) { + t.Helper() + var actualNames []string + err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { + var err error + actualNames, err = ReadBundleNamesFromStore(ctx, store, txn) + if err != nil && !storage.IsNotFound(err) { + t.Fatalf("Failed to read manifest names from store: %s", err) + return err + } + return nil + }) + if err != nil { + t.Fatalf("Unexpected error finishing transaction: %s", err) + } + + if len(actualNames) != len(expected) { + t.Fatalf("Expected %d name, found %d \n\t\tActual: %v\n", len(expected), len(actualNames), actualNames) + } + + for _, actualName := range actualNames { + found := false + for _, expectedName := range expected { + if actualName == expectedName { + found = true + break + } + } + if !found { + t.Errorf("Found unexpecxted bundle name %s, expected names: %+v", actualName, expected) + } + } +} + +func verifyReadLegacyRevision(ctx context.Context, t *testing.T, store storage.Store, expected string) { + t.Helper() + var actual string + err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { + var err error + if actual, err = LegacyReadRevisionFromStore(ctx, store, txn); err != nil && !storage.IsNotFound(err) { + t.Fatalf("Failed to read manifest revision from store: %s", err) + return err + } + return nil + }) + if err != nil { + t.Fatalf("Unexpected error finishing transaction: %s", err) + } + + if actual != expected { + t.Fatalf("Expected revision %s, got %s", expected, actual) + } +} + +func TestBundleLazyModeNoRaw(t *testing.T) { + ctx := context.Background() + mockStore := mock.New() + + compiler := ast.NewCompiler() + m := metrics.New() + + bundles := map[string]*Bundle{ + "bundle1": { + Manifest: Manifest{ + Roots: &[]string{"a"}, + }, + Data: map[string]interface{}{ + "a": map[string]interface{}{ + "b": "foo", + }, + }, + Etag: "foo", + lazyLoadingMode: true, + }, + } + + txn := storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err := Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + + if err == nil { + t.Fatal("expected error but got nil") + } + + errMsg := "raw bundle bytes not set on bundle object" + if err.Error() != errMsg { + t.Fatalf("expected error %v but got %v", errMsg, err.Error()) + } +} + +func TestBundleLazyModeLifecycleRaw(t *testing.T) { + files := [][2]string{ + {"/a/b/c/data.json", "[1,2,3]"}, + {"/a/b/d/data.json", "true"}, + {"/a/b/y/data.yaml", `foo: 1`}, + {"/example/example.rego", `package example`}, + {"/authz/allow/policy.wasm", `wasm-module`}, + {"/data.json", `{"x": {"y": true}, "a": {"b": {"z": true}}}}`}, + {"/.manifest", `{"revision": "foo", "roots": ["a", "example", "x", "authz"],"wasm":[{"entrypoint": "authz/allow", "module": "/authz/allow/policy.wasm"}]}`}, + } + + buf := archive.MustWriteTarGz(files) + loader := NewTarballLoaderWithBaseURL(buf, "") + br := NewCustomReader(loader).WithBundleEtag("bar").WithLazyLoadingMode(true) + + bundle, err := br.Read() + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + mockStore := mock.New() + + compiler := ast.NewCompiler() + m := metrics.New() + + extraMods := map[string]*ast.Module{ + "mod1": ast.MustParseModule("package x\np = true"), + } + + bundles := map[string]*Bundle{ + "bundle1": &bundle, + } + + txn := storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + ExtraModules: extraMods, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the bundle was activated + txn = storage.NewTransactionOrDie(ctx, mockStore) + names, err := ReadBundleNamesFromStore(ctx, mockStore, txn) + if err != nil { + t.Fatal(err) + } + + if len(names) != len(bundles) { + t.Fatalf("expected %d bundles in store, found %d", len(bundles), len(names)) + } + for _, name := range names { + if _, ok := bundles[name]; !ok { + t.Fatalf("unexpected bundle name found in store: %s", name) + } + } + + for bundleName, bundle := range bundles { + for modName := range bundle.ParsedModules(bundleName) { + if _, ok := compiler.Modules[modName]; !ok { + t.Fatalf("expected module %s from bundle %s to have been compiled", modName, bundleName) + } + } + } + + actual, err := mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + expectedRaw := ` +{ + "a": { + "b": { + "c": [1,2,3], + "d": true, + "y": { + "foo": 1 + }, + "z": true + } + }, + "x": { + "y": true + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "foo", + "roots": ["a", "example", "x", "authz"], + "wasm": [ + { + "entrypoint": "authz/allow", + "module": "/authz/allow/policy.wasm" + } + ] + }, + "etag": "bar", + "wasm": { + "/authz/allow/policy.wasm": "d2FzbS1tb2R1bGU=" + } + } + } + } +} +` + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Ensure that the extra module was included + if _, ok := compiler.Modules["mod1"]; !ok { + t.Fatalf("expected extra module to be compiled") + } + + // Stop the "read" transaction + mockStore.Abort(ctx, txn) + + txn = storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Deactivate(&DeactivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + BundleNames: map[string]struct{}{"bundle1": {}}, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Expect the store to have been cleared out after deactivating the bundle + txn = storage.NewTransactionOrDie(ctx, mockStore) + names, err = ReadBundleNamesFromStore(ctx, mockStore, txn) + if err != nil { + t.Fatal(err) + } + + if len(names) != 0 { + t.Fatalf("expected 0 bundles in store, found %d", len(names)) + } + + actual, err = mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + expectedRaw = `{"system": {"bundles": {}}}` + expected = loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + mockStore.AssertValid(t) +} + +func TestBundleLazyModeLifecycleRawInvalidData(t *testing.T) { + + tests := map[string]struct { + files [][2]string + err error + }{ + "non-object root": {[][2]string{{"/data.json", `[1,2,3]`}}, fmt.Errorf("root value must be object")}, + "invalid yaml": {[][2]string{{"/a/b/data.yaml", `"foo`}}, fmt.Errorf("yaml: found unexpected end of stream")}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + + buf := archive.MustWriteTarGz(tc.files) + loader := NewTarballLoaderWithBaseURL(buf, "") + br := NewCustomReader(loader).WithBundleEtag("bar").WithLazyLoadingMode(true) + + bundle, err := br.Read() + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + mockStore := mock.New() + + compiler := ast.NewCompiler() + m := metrics.New() + + bundles := map[string]*Bundle{ + "bundle1": &bundle, + } + + txn := storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + + if tc.err != nil && tc.err.Error() != err.Error() { + t.Fatalf("Expected error message %v but got %v", tc.err.Error(), err.Error()) + } + }) + } +} + +func TestBundleLazyModeLifecycle(t *testing.T) { + ctx := context.Background() + mockStore := mock.New() + + compiler := ast.NewCompiler() + m := metrics.New() + + extraMods := map[string]*ast.Module{ + "mod1": ast.MustParseModule("package x\np = true"), + } + + mod1 := "package a\np = true" + mod2 := "package b\np = true" + + b1Files := [][2]string{ + {"/.manifest", `{"roots": ["a"]}`}, + {"a/policy.rego", mod1}, + {"/data.json", `{"a": {"b": "foo"}}`}, + } + + buf := archive.MustWriteTarGz(b1Files) + loader := NewTarballLoaderWithBaseURL(buf, "") + br := NewCustomReader(loader).WithBundleEtag("foo").WithLazyLoadingMode(true).WithBundleName("bundle1") + + bundle1, err := br.Read() + if err != nil { + t.Fatal(err) + } + + b2Files := [][2]string{ + {"/.manifest", `{"roots": ["b", "c"]}`}, + {"b/policy.rego", mod2}, + {"/data.json", `{}`}, + } + + buf = archive.MustWriteTarGz(b2Files) + loader = NewTarballLoaderWithBaseURL(buf, "") + br = NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle2") + + bundle2, err := br.Read() + if err != nil { + t.Fatal(err) + } + + bundles := map[string]*Bundle{ + "bundle1": &bundle1, + "bundle2": &bundle2, + } + + txn := storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + ExtraModules: extraMods, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the bundle was activated + txn = storage.NewTransactionOrDie(ctx, mockStore) + names, err := ReadBundleNamesFromStore(ctx, mockStore, txn) + if err != nil { + t.Fatal(err) + } + + if len(names) != len(bundles) { + t.Fatalf("expected %d bundles in store, found %d", len(bundles), len(names)) + } + for _, name := range names { + if _, ok := bundles[name]; !ok { + t.Fatalf("unexpected bundle name found in store: %s", name) + } + } + + for bundleName, bundle := range bundles { + for modName := range bundle.ParsedModules(bundleName) { + if _, ok := compiler.Modules[modName]; !ok { + t.Fatalf("expected module %s from bundle %s to have been compiled", modName, bundleName) + } + } + } + + actual, err := mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + expectedRaw := ` +{ + "a": { + "b": "foo" + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "", + "roots": ["a"] + }, + "etag": "foo" + }, + "bundle2": { + "manifest": { + "revision": "", + "roots": ["b", "c"] + }, + "etag": "" + } + } + } +} +` + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Ensure that the extra module was included + if _, ok := compiler.Modules["mod1"]; !ok { + t.Fatalf("expected extra module to be compiled") + } + + // Stop the "read" transaction + mockStore.Abort(ctx, txn) + + txn = storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Deactivate(&DeactivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + BundleNames: map[string]struct{}{"bundle1": {}, "bundle2": {}}, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Expect the store to have been cleared out after deactivating the bundles + txn = storage.NewTransactionOrDie(ctx, mockStore) + names, err = ReadBundleNamesFromStore(ctx, mockStore, txn) + if err != nil { + t.Fatal(err) + } + + if len(names) != 0 { + t.Fatalf("expected 0 bundles in store, found %d", len(names)) + } + + actual, err = mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + expectedRaw = `{"system": {"bundles": {}}}` + expected = loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + mockStore.AssertValid(t) +} + +func TestDeltaBundleLazyModeLifecycleDiskStorage(t *testing.T) { + ctx := context.Background() + + test.WithTempFS(nil, func(dir string) { + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ + Dir: dir, + }) + if err != nil { + t.Fatal(err) + } + + compiler := ast.NewCompiler() + m := metrics.New() + + mod1 := "package a\np = true" + mod2 := "package b\np = true" + + b := Bundle{ + Manifest: Manifest{ + Roots: &[]string{"a"}, + }, + Data: map[string]interface{}{ + "a": map[string]interface{}{ + "b": "foo", + "e": map[string]interface{}{ + "f": "bar", + }, + "x": []map[string]string{{"name": "john"}, {"name": "jane"}}, + }, + }, + Modules: []ModuleFile{ + { + Path: "a/policy.rego", + Raw: []byte(mod1), + Parsed: ast.MustParseModule(mod1), + }, + }, + Etag: "foo", + } + + var buf1 bytes.Buffer + if err := NewWriter(&buf1).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + loader := NewTarballLoaderWithBaseURL(&buf1, "") + bundle1, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle1").Read() + if err != nil { + t.Fatal(err) + } + + b = Bundle{ + Manifest: Manifest{ + Roots: &[]string{"b", "c"}, + }, + Data: nil, + Modules: []ModuleFile{ + { + Path: "b/policy.rego", + Raw: []byte(mod2), + Parsed: ast.MustParseModule(mod2), + }, + }, + Etag: "foo", + } + + var buf2 bytes.Buffer + if err := NewWriter(&buf2).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + + loader = NewTarballLoaderWithBaseURL(&buf2, "") + bundle2, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle2").Read() + if err != nil { + t.Fatal(err) + } + + bundles := map[string]*Bundle{ + "bundle1": &bundle1, + "bundle2": &bundle2, + } + + txn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: store, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = store.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the snapshot bundles were activated + txn = storage.NewTransactionOrDie(ctx, store) + names, err := ReadBundleNamesFromStore(ctx, store, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + if len(names) != len(bundles) { + t.Fatalf("expected %d bundles in store, found %d", len(bundles), len(names)) + } + for _, name := range names { + if _, ok := bundles[name]; !ok { + t.Fatalf("unexpected bundle name found in store: %s", name) + } + } + + for bundleName, bundle := range bundles { + for modName := range bundle.ParsedModules(bundleName) { + if _, ok := compiler.Modules[modName]; !ok { + t.Fatalf("expected module %s from bundle %s to have been compiled", modName, bundleName) + } + } + } + + // Stop the "read" transaction + store.Abort(ctx, txn) + + // create a delta bundle and activate it + + // add a new object member + p1 := PatchOperation{ + Op: "upsert", + Path: "/a/c/d", + Value: []string{"foo", "bar"}, + } + + // append value to array + p2 := PatchOperation{ + Op: "upsert", + Path: "/a/c/d/-", + Value: "baz", + } + + // replace a value + p3 := PatchOperation{ + Op: "replace", + Path: "a/b", + Value: "bar", + } + + // add a new object root + p4 := PatchOperation{ + Op: "upsert", + Path: "/c/d", + Value: []string{"foo", "bar"}, + } + + deltaBundles := map[string]*Bundle{ + "bundle1": { + Manifest: Manifest{ + Revision: "delta-1", + Roots: &[]string{"a"}, + }, + Patch: Patch{Data: []PatchOperation{p1, p2, p3}}, + Etag: "bar", + }, + "bundle2": { + Manifest: Manifest{ + Revision: "delta-2", + Roots: &[]string{"b", "c"}, + }, + Patch: Patch{Data: []PatchOperation{p4}}, + Etag: "baz", + }, + "bundle3": { + Manifest: Manifest{ + Roots: &[]string{"d"}, + }, + Data: map[string]interface{}{ + "d": map[string]interface{}{ + "e": "foo", + }, + }, + }, + } + + txn = storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: store, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: deltaBundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = store.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // check the modules from the snapshot bundles are on the compiler + for bundleName, bundle := range bundles { + for modName := range bundle.ParsedModules(bundleName) { + if _, ok := compiler.Modules[modName]; !ok { + t.Fatalf("expected module %s from bundle %s to have been compiled", modName, bundleName) + } + } + } + + // Ensure the patches were applied + txn = storage.NewTransactionOrDie(ctx, store) + + actual, err := store.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw := ` + { + "a": { + "b": "bar", + "c": { + "d": ["foo", "bar", "baz"] + }, + "e": { + "f": "bar" + }, + "x": [{"name": "john"}, {"name": "jane"}] + }, + "c": {"d": ["foo", "bar"]}, + "d": {"e": "foo"}, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "delta-1", + "roots": ["a"] + }, + "etag": "bar" + }, + "bundle2": { + "manifest": { + "revision": "delta-2", + "roots": ["b", "c"] + }, + "etag": "baz" + }, + "bundle3": { + "manifest": { + "revision": "", + "roots": ["d"] + }, + "etag": "" + } + } + } + }` + + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + store.Abort(ctx, txn) + }) +} + +func TestDeltaBundleLazyModeLifecycle(t *testing.T) { + ctx := context.Background() + mockStore := mock.New() + + compiler := ast.NewCompiler() + m := metrics.New() + + mod1 := "package a\np = true" + mod2 := "package b\np = true" + + b := Bundle{ + Manifest: Manifest{ + Roots: &[]string{"a"}, + }, + Data: map[string]interface{}{ + "a": map[string]interface{}{ + "b": "foo", + "e": map[string]interface{}{ + "f": "bar", + }, + "x": []map[string]string{{"name": "john"}, {"name": "jane"}}, + }, + }, + Modules: []ModuleFile{ + { + Path: "policy.rego", + Raw: []byte(mod1), + Parsed: ast.MustParseModule(mod1), + }, + }, + Etag: "foo", + } + + var buf1 bytes.Buffer + if err := NewWriter(&buf1).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + loader := NewTarballLoaderWithBaseURL(&buf1, "") + bundle1, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle1").Read() + if err != nil { + t.Fatal(err) + } + + b = Bundle{ + Manifest: Manifest{ + Roots: &[]string{"b", "c"}, + }, + Data: nil, + Modules: []ModuleFile{ + { + Path: "policy.rego", + Raw: []byte(mod2), + Parsed: ast.MustParseModule(mod2), + }, + }, + Etag: "foo", + lazyLoadingMode: true, + sizeLimitBytes: DefaultSizeLimitBytes + 1, + } + + var buf2 bytes.Buffer + if err := NewWriter(&buf2).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + loader = NewTarballLoaderWithBaseURL(&buf2, "") + bundle2, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle2").Read() + if err != nil { + t.Fatal(err) + } + + bundles := map[string]*Bundle{ + "bundle1": &bundle1, + "bundle2": &bundle2, + } + + txn := storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the snapshot bundles were activated + txn = storage.NewTransactionOrDie(ctx, mockStore) + names, err := ReadBundleNamesFromStore(ctx, mockStore, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + if len(names) != len(bundles) { + t.Fatalf("expected %d bundles in store, found %d", len(bundles), len(names)) + } + for _, name := range names { + if _, ok := bundles[name]; !ok { + t.Fatalf("unexpected bundle name found in store: %s", name) + } + } + + for bundleName, bundle := range bundles { + for modName := range bundle.ParsedModules(bundleName) { + if _, ok := compiler.Modules[modName]; !ok { + t.Fatalf("expected module %s from bundle %s to have been compiled", modName, bundleName) + } + } + } + + // Stop the "read" transaction + mockStore.Abort(ctx, txn) + + // create a delta bundle and activate it + + // add a new object member + p1 := PatchOperation{ + Op: "upsert", + Path: "/a/c/d", + Value: []string{"foo", "bar"}, + } + + // append value to array + p2 := PatchOperation{ + Op: "upsert", + Path: "/a/c/d/-", + Value: "baz", + } + + // insert value in array + p3 := PatchOperation{ + Op: "upsert", + Path: "/a/x/1", + Value: map[string]string{"name": "alice"}, + } + + // replace a value + p4 := PatchOperation{ + Op: "replace", + Path: "a/b", + Value: "bar", + } + + // remove a value + p5 := PatchOperation{ + Op: "remove", + Path: "a/e", + } - // delete it - err = storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { - if err := LegacyEraseManifestFromStore(ctx, store, txn); err != nil { - t.Fatalf("Failed to erase manifest from store: %s", err) - return err - } - return nil - }) - if err != nil { - t.Fatalf("Unexpected error finishing transaction: %s", err) + // add a new object with an escaped character in the path + p6 := PatchOperation{ + Op: "upsert", + Path: "a/y/~0z", + Value: []int{1, 2, 3}, } - verifyReadLegacyRevision(ctx, t, store, "") -} + // add a new object root + p7 := PatchOperation{ + Op: "upsert", + Path: "/c/d", + Value: []string{"foo", "bar"}, + } -func TestMixedManifestStoreLifecycle(t *testing.T) { - store := inmem.New() - ctx := context.Background() - bundles := map[string]Manifest{ + deltaBundles := map[string]*Bundle{ "bundle1": { - Revision: "abc123", - Roots: &[]string{"/a/b", "/a/c"}, + Manifest: Manifest{ + Revision: "delta-1", + Roots: &[]string{"a"}, + }, + Patch: Patch{Data: []PatchOperation{p1, p2, p3, p4, p5, p6}}, + Etag: "bar", }, "bundle2": { - Revision: "def123", - Roots: &[]string{"/x/y", "/z"}, + Manifest: Manifest{ + Revision: "delta-2", + Roots: &[]string{"b", "c"}, + }, + Patch: Patch{Data: []PatchOperation{p7}}, + Etag: "baz", + }, + "bundle3": { + Manifest: Manifest{ + Roots: &[]string{"d"}, + }, + Data: map[string]interface{}{ + "d": map[string]interface{}{ + "e": "foo", + }, + }, }, } - // Write the legacy one first - err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { - if err := LegacyWriteManifestToStore(ctx, store, txn, bundles["bundle1"]); err != nil { - t.Fatalf("Failed to write manifest to store: %s", err) - return err - } - return nil + txn = storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: deltaBundles, }) if err != nil { - t.Fatalf("Unexpected error finishing transaction: %s", err) + t.Fatalf("unexpected error: %s", err) } - verifyReadBundleNames(ctx, t, store, []string{}) - - // Write both new ones - verifyWriteManifests(ctx, t, store, bundles) - verifyReadBundleNames(ctx, t, store, []string{"bundle1", "bundle2"}) - - // Ensure the original legacy one is still there - verifyReadLegacyRevision(ctx, t, store, bundles["bundle1"].Revision) -} - -func verifyDeleteManifest(ctx context.Context, t *testing.T, store storage.Store, name string) { - err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { - err := EraseManifestFromStore(ctx, store, txn, name) - if err != nil { - t.Fatalf("Failed to delete manifest from store: %s", err) - } - return err - }) + err = mockStore.Commit(ctx, txn) if err != nil { - t.Fatalf("Unexpected error finishing transaction: %s", err) + t.Fatalf("unexpected error: %s", err) } -} -func verifyWriteManifests(ctx context.Context, t *testing.T, store storage.Store, bundles map[string]Manifest) { - t.Helper() - for name, manifest := range bundles { - err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { - err := WriteManifestToStore(ctx, store, txn, name, manifest) - if err != nil { - t.Fatalf("Failed to write manifest to store: %s", err) + // check the modules from the snapshot bundles are on the compiler + for bundleName, bundle := range bundles { + for modName := range bundle.ParsedModules(bundleName) { + if _, ok := compiler.Modules[modName]; !ok { + t.Fatalf("expected module %s from bundle %s to have been compiled", modName, bundleName) } - return err - }) - if err != nil { - t.Fatalf("Unexpected error finishing transaction: %s", err) } } -} -func verifyReadBundleNames(ctx context.Context, t *testing.T, store storage.Store, expected []string) { - t.Helper() - var actualNames []string - err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { - var err error - actualNames, err = ReadBundleNamesFromStore(ctx, store, txn) - if err != nil && !storage.IsNotFound(err) { - t.Fatalf("Failed to read manifest names from store: %s", err) - return err - } - return nil - }) - if err != nil { - t.Fatalf("Unexpected error finishing transaction: %s", err) - } + // Ensure the patches were applied + txn = storage.NewTransactionOrDie(ctx, mockStore) - if len(actualNames) != len(expected) { - t.Fatalf("Expected %d name, found %d \n\t\tActual: %v\n", len(expected), len(actualNames), actualNames) + actual, err := mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) } - for _, actualName := range actualNames { - found := false - for _, expectedName := range expected { - if actualName == expectedName { - found = true - break + expectedRaw := ` + { + "a": { + "b": "bar", + "c": { + "d": ["foo", "bar", "baz"] + }, + "x": [{"name": "john"}, {"name": "alice"}, {"name": "jane"}], + "y": {"~z": [1, 2, 3]} + }, + "c": {"d": ["foo", "bar"]}, + "d": {"e": "foo"}, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "delta-1", + "roots": ["a"] + }, + "etag": "bar" + }, + "bundle2": { + "manifest": { + "revision": "delta-2", + "roots": ["b", "c"] + }, + "etag": "baz" + }, + "bundle3": { + "manifest": { + "revision": "", + "roots": ["d"] + }, + "etag": "" + } } } - if !found { - t.Errorf("Found unexpecxted bundle name %s, expected names: %+v", actualName, expected) - } - } -} + }` -func verifyReadLegacyRevision(ctx context.Context, t *testing.T, store storage.Store, expected string) { - t.Helper() - var actual string - err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error { - var err error - if actual, err = LegacyReadRevisionFromStore(ctx, store, txn); err != nil && !storage.IsNotFound(err) { - t.Fatalf("Failed to read manifest revision from store: %s", err) - return err - } - return nil - }) - if err != nil { - t.Fatalf("Unexpected error finishing transaction: %s", err) + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) } - if actual != expected { - t.Fatalf("Expected revision %s, got %s", expected, actual) - } + // Stop the "read" transaction + mockStore.Abort(ctx, txn) + + mockStore.AssertValid(t) } func TestBundleLifecycle(t *testing.T) { @@ -243,8 +1252,7 @@ func TestBundleLifecycle(t *testing.T) { Parsed: ast.MustParseModule(mod2), }, }, - Etag: "foo", - }, + Etag: "foo"}, "bundle2": { Manifest: Manifest{ Roots: &[]string{"b", "c"}, @@ -1452,6 +2460,128 @@ func testWriteData(t *testing.T, tc testWriteModuleCase, legacy bool) { }) } +func TestDoDFS(t *testing.T) { + + cases := []struct { + note string + input map[string]json.RawMessage + path string + roots []string + wantErr bool + err error + }{ + { + note: "bundle owns all", + input: nil, + path: "/", + roots: []string{""}, + wantErr: false, + }, + { + note: "data within roots root case", + input: map[string]json.RawMessage{"a": json.RawMessage(`true`)}, + path: "", + roots: []string{"a"}, + wantErr: false, + }, + { + note: "data within roots nested 1", + input: map[string]json.RawMessage{"d": json.RawMessage(`true`)}, + path: filepath.Dir(strings.Trim("a/b/c/data.json", "/")), + roots: []string{"a/b/c"}, + wantErr: false, + }, + { + note: "data within roots nested 2", + input: map[string]json.RawMessage{"d": json.RawMessage(`{"hello": "world"}`)}, + path: filepath.Dir(strings.Trim("a/b/c/data.json", "/")), + roots: []string{"a/b/c"}, + wantErr: false, + }, + { + note: "data within roots nested 3", + input: map[string]json.RawMessage{"d": json.RawMessage(`{"hello": "world"}`)}, + path: filepath.Dir(strings.Trim("a/data.json", "/")), + roots: []string{"a/d"}, + wantErr: false, + }, + { + note: "data within multiple roots 1", + input: map[string]json.RawMessage{"a": json.RawMessage(`{"b": "c"}`), "c": json.RawMessage(`true`)}, + path: filepath.Dir(strings.Trim("/data.json", "/")), + roots: []string{"a/b", "c"}, + wantErr: false, + }, + { + note: "data within multiple roots 2", + input: map[string]json.RawMessage{"a": json.RawMessage(`{"b": "c"}`), "c": []byte(`{"d": {"e": {"f": true}}}`)}, + path: filepath.Dir(strings.Trim("/data.json", "/")), + roots: []string{"a/b", "c/d/e"}, + wantErr: false, + }, + { + note: "data outside roots 1", + input: map[string]json.RawMessage{"d": json.RawMessage(`{"hello": "world"}`)}, + path: filepath.Dir(strings.Trim("/data.json", "/")), + roots: []string{"a/d"}, + wantErr: true, + err: fmt.Errorf("manifest roots [a/d] do not permit data at path '/d' (hint: check bundle directory structure)"), + }, + { + note: "data outside roots 2", + input: map[string]json.RawMessage{"a": []byte(`{"b": {"c": {"e": true}}}`)}, + path: filepath.Dir(strings.Trim("/x/data.json", "/")), + roots: []string{"x/a/b/c/d"}, + wantErr: true, + err: fmt.Errorf("manifest roots [x/a/b/c/d] do not permit data at path '/x/a/b/c/e' (hint: check bundle directory structure)"), + }, + { + note: "data outside roots 3", + input: map[string]json.RawMessage{"a": []byte(`{"b": {"c": true}}`)}, + path: filepath.Dir(strings.Trim("/data.json", "/")), + roots: []string{"a/b/c/d"}, + wantErr: true, + err: fmt.Errorf("manifest roots [a/b/c/d] do not permit data at path '/a/b/c' (hint: check bundle directory structure)"), + }, + { + note: "data outside multiple roots", + input: map[string]json.RawMessage{"a": json.RawMessage(`{"b": "c"}`), "e": []byte(`{"b": {"c": true}}`)}, + path: filepath.Dir(strings.Trim("/data.json", "/")), + roots: []string{"a/b", "c"}, + wantErr: true, + err: fmt.Errorf("manifest roots [a/b c] do not permit data at path '/e' (hint: check bundle directory structure)"), + }, + { + note: "data outside multiple roots 2", + input: map[string]json.RawMessage{"a": json.RawMessage(`{"b": "c"}`), "c": []byte(`{"d": true}`)}, + path: filepath.Dir(strings.Trim("/data.json", "/")), + roots: []string{"a/b", "c/d/e"}, + wantErr: true, + err: fmt.Errorf("manifest roots [a/b c/d/e] do not permit data at path '/c/d' (hint: check bundle directory structure)"), + }, + } + + for _, tc := range cases { + t.Run(tc.note, func(t *testing.T) { + + err := doDFS(tc.input, tc.path, tc.roots) + if tc.wantErr { + if err == nil { + t.Fatal("Expected error but got nil") + } + + if tc.err != nil && tc.err.Error() != err.Error() { + t.Fatalf("Expected error message %v but got %v", tc.err.Error(), err.Error()) + } + } else { + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + } + }) + } +} + func TestHasRootsOverlap(t *testing.T) { ctx := context.Background() diff --git a/docs/content/misc-disk.md b/docs/content/misc-disk.md index fbc0ba7dcd..a7ae73ba57 100644 --- a/docs/content/misc-disk.md +++ b/docs/content/misc-disk.md @@ -19,17 +19,6 @@ Backup and restore, or repair procedures for data corruption are not provided at this time. {{< /info >}} -While it's possible to load data larger than the allotted memory resources into OPA -using disk storage, there are limitations to be aware of: - -{{< danger >}} -[Bundles](../management-bundles/) are loaded into memory **entirely** even when -disk storage is used: the decompressed, parsed bundle content is then inserted -into the disk store. - -It is planned to fix this limitation in the future. -{{< /danger >}} - ## Partitions Partitions determine how the JSON data is split up when stored in the diff --git a/download/download.go b/download/download.go index 353ccdeb91..3dc3ba74ce 100644 --- a/download/download.go +++ b/download/download.go @@ -62,6 +62,8 @@ type Downloader struct { stopped bool persist bool longPollingEnabled bool + lazyLoadingMode bool + bundleName string } type downloaderResponse struct { @@ -115,6 +117,21 @@ func (d *Downloader) WithBundlePersistence(persist bool) *Downloader { return d } +// WithLazyLoadingMode specifies how the downloaded bundle should be read. +// If true, data files in the bundle will not be deserialized +// and the check to validate that the bundle data does not contain paths +// outside the bundle's roots will not be performed while reading the bundle. +func (d *Downloader) WithLazyLoadingMode(yes bool) *Downloader { + d.lazyLoadingMode = yes + return d +} + +// WithBundleName specifies the name of the downloaded bundle. +func (d *Downloader) WithBundleName(bundleName string) *Downloader { + d.bundleName = bundleName + return d +} + // ClearCache is deprecated. Use SetCache instead. func (d *Downloader) ClearCache() { d.etag = "" @@ -305,8 +322,13 @@ func (d *Downloader) download(ctx context.Context, m metrics.Metrics) (*download } etag := resp.Header.Get("ETag") - reader := bundle.NewCustomReader(loader).WithMetrics(m).WithBundleVerificationConfig(d.bvc). - WithBundleEtag(etag) + + reader := bundle.NewCustomReader(loader). + WithMetrics(m). + WithBundleVerificationConfig(d.bvc). + WithBundleEtag(etag). + WithLazyLoadingMode(d.lazyLoadingMode). + WithBundleName(d.bundleName) if d.sizeLimitBytes != nil { reader = reader.WithSizeLimitBytes(*d.sizeLimitBytes) } diff --git a/download/download_test.go b/download/download_test.go index 8d2b6e8e4b..26b1899f8d 100644 --- a/download/download_test.go +++ b/download/download_test.go @@ -148,6 +148,47 @@ func TestStopWithMultipleCalls(t *testing.T) { } } +func TestStartStopWithLazyLoadingMode(t *testing.T) { + ctx := context.Background() + fixture := newTestFixture(t) + + updates := make(chan *Update) + + config := Config{} + if err := config.ValidateAndInjectDefaults(); err != nil { + t.Fatal(err) + } + + d := New(config, fixture.client, "/bundles/test/bundle1").WithCallback(func(_ context.Context, u Update) { + updates <- &u + }).WithLazyLoadingMode(true) + + d.Start(ctx) + + // Give time for some download events to occur + time.Sleep(1 * time.Second) + + u1 := <-updates + + if u1.Bundle == nil || len(u1.Bundle.Modules) == 0 { + t.Fatal("expected bundle with at least one module but got:", u1) + } + + if !strings.HasSuffix(u1.Bundle.Modules[0].URL, u1.Bundle.Modules[0].Path) { + t.Fatalf("expected URL to have path as suffix but got %v and %v", u1.Bundle.Modules[0].URL, u1.Bundle.Modules[0].Path) + } + + if len(u1.Bundle.Raw) == 0 { + t.Fatal("expected bundle to contain raw bytes") + } + + if len(u1.Bundle.Data) != 0 { + t.Fatal("expected the bundle object to contain no data") + } + + d.Stop(ctx) +} + func TestStartStopWithDeltaBundleMode(t *testing.T) { ctx := context.Background() diff --git a/internal/storage/mock/mock.go b/internal/storage/mock/mock.go index c05cdaf81a..5ecdb61c58 100644 --- a/internal/storage/mock/mock.go +++ b/internal/storage/mock/mock.go @@ -247,6 +247,10 @@ func (s *Store) Abort(ctx context.Context, txn storage.Transaction) { mockTxn.Aborted++ } +func (s *Store) Truncate(ctx context.Context, txn storage.Transaction, params storage.TransactionParams, it storage.Iterator) error { + return s.inmem.Truncate(ctx, getRealTxn(txn), params, it) +} + func getRealTxn(txn storage.Transaction) storage.Transaction { return txn.(*Transaction).txn } diff --git a/plugins/bundle/plugin.go b/plugins/bundle/plugin.go index 6e279cdeac..60f33b3c06 100644 --- a/plugins/bundle/plugin.go +++ b/plugins/bundle/plugin.go @@ -437,7 +437,8 @@ func (p *Plugin) newDownloader(name string, source *Source) Loader { WithCallback(callback). WithBundleVerificationConfig(source.Signing). WithSizeLimitBytes(source.SizeLimitBytes). - WithBundlePersistence(p.persistBundle(name)) + WithBundlePersistence(p.persistBundle(name)). + WithLazyLoadingMode(true).WithBundleName(name) } func (p *Plugin) oneShot(ctx context.Context, name string, u download.Update) { diff --git a/plugins/bundle/plugin_test.go b/plugins/bundle/plugin_test.go index ae36ab5df6..c0e705b8d1 100644 --- a/plugins/bundle/plugin_test.go +++ b/plugins/bundle/plugin_test.go @@ -104,6 +104,146 @@ func TestPluginOneShot(t *testing.T) { } } +func TestPluginStartLazyLoadInMem(t *testing.T) { + ctx := context.Background() + + module := "package authz\n\ncorge=1" + + // setup fake http server with mock bundle + mockBundle1 := bundle.Bundle{ + Data: map[string]interface{}{"p": "x1"}, + Modules: []bundle.ModuleFile{ + { + URL: "/bar/policy.rego", + Path: "/bar/policy.rego", + Parsed: ast.MustParseModule(module), + Raw: []byte(module), + }, + }, + Manifest: bundle.Manifest{ + Roots: &[]string{"p", "authz"}, + }, + } + + s1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + err := bundle.NewWriter(w).Write(mockBundle1) + if err != nil { + t.Fatal(err) + } + })) + + mockBundle2 := bundle.Bundle{ + Data: map[string]interface{}{"q": "x2"}, + Modules: []bundle.ModuleFile{}, + Manifest: bundle.Manifest{ + Roots: &[]string{"q"}, + }, + } + + s2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + err := bundle.NewWriter(w).Write(mockBundle2) + if err != nil { + t.Fatal(err) + } + })) + + config := []byte(fmt.Sprintf(`{ + "services": { + "default": { + "url": %q + }, + "acmecorp": { + "url": %q + } + } + }`, s1.URL, s2.URL)) + + manager := getTestManagerWithOpts(config) + defer manager.Stop(ctx) + + var mode plugins.TriggerMode = "manual" + + plugin := New(&Config{ + Bundles: map[string]*Source{ + "test-1": { + Service: "default", + SizeLimitBytes: int64(bundle.DefaultSizeLimitBytes), + Config: download.Config{Trigger: &mode}, + }, + "test-2": { + Service: "acmecorp", + SizeLimitBytes: int64(bundle.DefaultSizeLimitBytes), + Config: download.Config{Trigger: &mode}, + }, + }, + }, manager) + + statusCh := make(chan map[string]*Status) + + // register for bundle updates to observe changes and start the plugin + plugin.RegisterBulkListener("test-case", func(st map[string]*Status) { + statusCh <- st + }) + + err := plugin.Start(ctx) + if err != nil { + t.Fatal(err) + } + + // manually trigger bundle download on all configured bundles + go func() { + _ = plugin.Trigger(ctx) + }() + + // wait for bundle update and then assert on data content + <-statusCh + <-statusCh + + result, err := storage.ReadOne(ctx, manager.Store, storage.Path{"p"}) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(result, mockBundle1.Data["p"]) { + t.Fatalf("expected data to be %v but got %v", mockBundle1.Data, result) + } + + result, err = storage.ReadOne(ctx, manager.Store, storage.Path{"q"}) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(result, mockBundle2.Data["q"]) { + t.Fatalf("expected data to be %v but got %v", mockBundle2.Data, result) + } + + txn := storage.NewTransactionOrDie(ctx, manager.Store) + defer manager.Store.Abort(ctx, txn) + + ids, err := manager.Store.ListPolicies(ctx, txn) + if err != nil { + t.Fatal(err) + } else if len(ids) != 1 { + t.Fatal("Expected 1 policy") + } + + bs, err := manager.Store.GetPolicy(ctx, txn, ids[0]) + exp := []byte("package authz\n\ncorge=1") + if err != nil { + t.Fatal(err) + } else if !bytes.Equal(bs, exp) { + t.Fatalf("Bad policy content. Exp:\n%v\n\nGot:\n\n%v", string(exp), string(bs)) + } + + data, err := manager.Store.Read(ctx, txn, storage.Path{}) + expData := util.MustUnmarshalJSON([]byte(`{"p": "x1", "q": "x2", "system": {"bundles": {"test-1": {"etag": "", "manifest": {"revision": "", "roots": ["p", "authz"]}}, "test-2": {"etag": "", "manifest": {"revision": "", "roots": ["q"]}}}}}`)) + if err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data, expData) { + t.Fatalf("Bad data content. Exp:\n%v\n\nGot:\n\n%v", expData, data) + } +} + func TestPluginOneShotDiskStorageMetrics(t *testing.T) { test.WithTempFS(nil, func(dir string) { @@ -158,7 +298,7 @@ func TestPluginOneShotDiskStorageMetrics(t *testing.T) { t.Errorf("%s: expected %v, got %v", name, exp, act) } name = "disk_read_keys" - if exp, act := 12, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + if exp, act := 13, met.Counter(name).Value(); act.(uint64) != uint64(exp) { t.Errorf("%s: expected %v, got %v", name, exp, act) } name = "disk_read_bytes" @@ -2876,6 +3016,155 @@ func TestPluginManualTrigger(t *testing.T) { } } +func TestPluginManualTriggerMultipleDiskStorage(t *testing.T) { + + ctx := context.Background() + + module := "package authz\n\ncorge=1" + + // setup fake http server with mock bundle + mockBundle1 := bundle.Bundle{ + Data: map[string]interface{}{"p": "x1"}, + Modules: []bundle.ModuleFile{ + { + URL: "/bar/policy.rego", + Path: "/bar/policy.rego", + Parsed: ast.MustParseModule(module), + Raw: []byte(module), + }, + }, + Manifest: bundle.Manifest{ + Roots: &[]string{"p", "authz"}, + }, + } + + s1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + err := bundle.NewWriter(w).Write(mockBundle1) + if err != nil { + t.Fatal(err) + } + })) + + mockBundle2 := bundle.Bundle{ + Data: map[string]interface{}{"q": "x2"}, + Modules: []bundle.ModuleFile{}, + Manifest: bundle.Manifest{ + Roots: &[]string{"q"}, + }, + } + + s2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + err := bundle.NewWriter(w).Write(mockBundle2) + if err != nil { + t.Fatal(err) + } + })) + + test.WithTempFS(nil, func(dir string) { + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ + Dir: dir, + }) + if err != nil { + t.Fatal(err) + } + config := []byte(fmt.Sprintf(`{ + "services": { + "default": { + "url": %q + }, + "acmecorp": { + "url": %q + } + } + }`, s1.URL, s2.URL)) + + manager := getTestManagerWithOpts(config, store) + defer manager.Stop(ctx) + + var mode plugins.TriggerMode = "manual" + + plugin := New(&Config{ + Bundles: map[string]*Source{ + "test-1": { + Service: "default", + SizeLimitBytes: int64(bundle.DefaultSizeLimitBytes), + Config: download.Config{Trigger: &mode}, + }, + "test-2": { + Service: "acmecorp", + SizeLimitBytes: int64(bundle.DefaultSizeLimitBytes), + Config: download.Config{Trigger: &mode}, + }, + }, + }, manager) + + statusCh := make(chan map[string]*Status) + + // register for bundle updates to observe changes and start the plugin + plugin.RegisterBulkListener("test-case", func(st map[string]*Status) { + statusCh <- st + }) + + err = plugin.Start(ctx) + if err != nil { + t.Fatal(err) + } + + // manually trigger bundle download on all configured bundles + go func() { + _ = plugin.Trigger(ctx) + }() + + // wait for bundle update and then assert on data content + <-statusCh + <-statusCh + + result, err := storage.ReadOne(ctx, manager.Store, storage.Path{"p"}) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(result, mockBundle1.Data["p"]) { + t.Fatalf("expected data to be %v but got %v", mockBundle1.Data, result) + } + + result, err = storage.ReadOne(ctx, manager.Store, storage.Path{"q"}) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(result, mockBundle2.Data["q"]) { + t.Fatalf("expected data to be %v but got %v", mockBundle2.Data, result) + } + + txn := storage.NewTransactionOrDie(ctx, manager.Store) + defer manager.Store.Abort(ctx, txn) + + ids, err := manager.Store.ListPolicies(ctx, txn) + if err != nil { + t.Fatal(err) + } else if len(ids) != 1 { + t.Fatal("Expected 1 policy") + } + + bs, err := manager.Store.GetPolicy(ctx, txn, ids[0]) + exp := []byte("package authz\n\ncorge=1") + if err != nil { + t.Fatal(err) + } else if !bytes.Equal(bs, exp) { + t.Fatalf("Bad policy content. Exp:\n%v\n\nGot:\n\n%v", string(exp), string(bs)) + } + + data, err := manager.Store.Read(ctx, txn, storage.Path{}) + expData := util.MustUnmarshalJSON([]byte(`{"p": "x1", "q": "x2", "system": {"bundles": {"test-1": {"etag": "", "manifest": {"revision": "", "roots": ["p", "authz"]}}, "test-2": {"etag": "", "manifest": {"revision": "", "roots": ["q"]}}}}}`)) + if err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data, expData) { + t.Fatalf("Bad data content. Exp:\n%v\n\nGot:\n\n%v", expData, data) + } + }) +} + func TestPluginManualTriggerMultiple(t *testing.T) { ctx := context.Background() diff --git a/server/server_test.go b/server/server_test.go index d2b82e1cdc..042cdc4562 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -3699,6 +3699,10 @@ func (queryBindingErrStore) Abort(ctx context.Context, txn storage.Transaction) } +func (queryBindingErrStore) Truncate(context.Context, storage.Transaction, storage.TransactionParams, storage.Iterator) error { + return nil +} + func (queryBindingErrStore) Register(context.Context, storage.Transaction, storage.TriggerConfig) (storage.TriggerHandle, error) { return nil, nil } diff --git a/storage/disk/config.go b/storage/disk/config.go index e1019dbde6..bab976cebc 100644 --- a/storage/disk/config.go +++ b/storage/disk/config.go @@ -65,11 +65,18 @@ func OptionsFromConfig(raw []byte, id string) (*Options, error) { return &opts, nil } -func badgerConfigFromOptions(opts Options) badger.Options { +func badgerConfigFromOptions(opts Options) (badger.Options, error) { // Set some things _after_ FromSuperFlag to prohibit overriding them + + dir, err := dataDir(opts.Dir) + if err != nil { + return badger.DefaultOptions(""), err + } + return badger.DefaultOptions(""). - FromSuperFlag(opts.Badger). - WithDir(dataDir(opts.Dir)). - WithValueDir(dataDir(opts.Dir)). - WithDetectConflicts(false) // We only allow one write txn at a time; so conflicts cannot happen. + FromSuperFlag(opts.Badger). + WithDir(dir). + WithValueDir(dir). + WithDetectConflicts(false), // We only allow one write txn at a time; so conflicts cannot happen. + nil } diff --git a/storage/disk/config_test.go b/storage/disk/config_test.go index 7811c123bb..966df2bfc6 100644 --- a/storage/disk/config_test.go +++ b/storage/disk/config_test.go @@ -229,7 +229,7 @@ func TestBadgerConfigFromOptions(t *testing.T) { for _, tc := range tests { t.Run(tc.note, func(t *testing.T) { - act := badgerConfigFromOptions(tc.opts) + act, _ := badgerConfigFromOptions(tc.opts) for _, check := range tc.checks { check(t, act) } diff --git a/storage/disk/disk.go b/storage/disk/disk.go index 2558b56718..6869e1ce67 100644 --- a/storage/disk/disk.go +++ b/storage/disk/disk.go @@ -60,7 +60,12 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" + "io" + "io/ioutil" + "os" + "path" "path/filepath" "strings" "sync" @@ -113,6 +118,9 @@ const ( // basePartitionVersion represents the version of the caller-supplied data // layout (aka partitioning). basePartitionVersion int64 = 1 + + // symlink to directory path where badger write its files to + symlinkKey = "active" ) type metadata struct { @@ -147,7 +155,13 @@ func New(ctx context.Context, logger logging.Logger, prom prometheus.Registerer, } } - db, err := badger.Open(badgerConfigFromOptions(opts).WithLogger(&wrap{logger})) + options, err := badgerConfigFromOptions(opts) + if err != nil { + return nil, wrapError(err) + } + + options = options.WithLogger(&wrap{logger}) + db, err := badger.Open(options) if err != nil { return nil, wrapError(err) } @@ -235,6 +249,235 @@ func (db *Store) NewTransaction(ctx context.Context, params ...storage.Transacti return newTransaction(xid, write, underlying, context, db.pm, db.partitions, db), nil } +// Truncate implements the storage.Store interface. This method must be called within a transaction. +func (db *Store) Truncate(ctx context.Context, txn storage.Transaction, params storage.TransactionParams, it storage.Iterator) error { + var err error + + newDB, backupDir, err := db.backupAndLoadDB() + if err != nil { + return wrapError(err) + } + + // write new bundle policy and data into the new DB + underlying := newDB.NewTransaction(true) + xid := atomic.AddUint64(&db.xid, uint64(1)) + underlyingTxn := newTransaction(xid, true, underlying, params.Context, db.pm, db.partitions, nil) + + for { + var update *storage.Update + + update, err = it.Next() + if err == io.EOF { + break + } + + if err != nil { + return wrapError(err) + } + + if update.IsPolicy { + err = underlyingTxn.UpsertPolicy(ctx, update.Path.String(), update.Value) + if err != nil { + if err != badger.ErrTxnTooBig { + return wrapError(err) + } + + _, err = underlyingTxn.Commit(ctx) + if err != nil { + return wrapError(err) + } + + underlying = newDB.NewTransaction(true) + xid = atomic.AddUint64(&db.xid, uint64(1)) + underlyingTxn = newTransaction(xid, true, underlying, params.Context, db.pm, db.partitions, nil) + + if err = underlyingTxn.UpsertPolicy(ctx, update.Path.String(), update.Value); err != nil { + return wrapError(err) + } + } + } else { + if len(update.Path) > 0 { + sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, update.Path, update.Value) + if err != nil { + return wrapError(err) + } + + if sTxn != nil { + underlyingTxn = sTxn + } + } else { + // write operation at root path + + var obj map[string]json.RawMessage + err := util.Unmarshal(update.Value, &obj) + if err != nil { + return err + } + + for k := range obj { + newPath, ok := storage.ParsePathEscaped("/" + k) + if !ok { + return fmt.Errorf("storage path invalid: %v", newPath) + } + + if err := storage.MakeDir(ctx, db, txn, newPath[:len(newPath)-1]); err != nil { + return err + } + + sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, newPath, obj[k]) + if err != nil { + return wrapError(err) + } + + if sTxn != nil { + underlyingTxn = sTxn + } + } + } + } + } + + if err != nil && err != io.EOF { + return wrapError(err) + } + + // commit active transaction on new store + _, err = underlyingTxn.Commit(ctx) + if err != nil { + return wrapError(err) + } + + db.rmu.Lock() + + // update symlink to point to the active db + symlink := filepath.Join(path.Dir(newDB.Opts().Dir), symlinkKey) + if _, err := os.Lstat(symlink); err == nil { + if err := os.Remove(symlink); err != nil { + return wrapError(err) + } + + err = os.Symlink(newDB.Opts().Dir, symlink) + if err != nil { + return wrapError(err) + } + } else if errors.Is(err, os.ErrNotExist) { + err = os.Symlink(newDB.Opts().Dir, symlink) + if err != nil { + return wrapError(err) + } + } else { + return wrapError(err) + } + + // swap db + oldDb := db.db + db.db = newDB + + // replace transaction on old badger store with new one + uTxn, err := db.underlying(txn) + if err != nil { + return err + } + + uTxn.Abort(ctx) + + uTxn.stale = false + uTxn.underlying = newDB.NewTransaction(true) + + db.rmu.Unlock() + + return wrapError(db.cleanup(oldDb, backupDir)) +} + +func (db *Store) doTruncateData(ctx context.Context, underlying *transaction, badgerdb *badger.DB, + params storage.TransactionParams, path storage.Path, value interface{}) (*transaction, error) { + + err := underlying.Write(ctx, storage.AddOp, path, value) + if err != nil { + if err != badger.ErrTxnTooBig { + return nil, wrapError(err) + } + + _, err = underlying.Commit(ctx) + if err != nil { + return nil, wrapError(err) + } + + txn := badgerdb.NewTransaction(true) + xid := atomic.AddUint64(&db.xid, uint64(1)) + sTxn := newTransaction(xid, true, txn, params.Context, db.pm, db.partitions, nil) + + if err = sTxn.Write(ctx, storage.AddOp, path, value); err != nil { + return nil, wrapError(err) + } + + return sTxn, nil + } + + return nil, nil +} + +func (db *Store) backupAndLoadDB() (*badger.DB, string, error) { + currDir := db.db.Opts().Dir + + // backup db + backupDir, err := ioutil.TempDir(path.Dir(currDir), "backup") + if err != nil { + return nil, "", wrapError(err) + } + + bak, err := ioutil.TempFile(backupDir, "badgerbak") + if err != nil { + return nil, "", wrapError(err) + } + + _, err = db.db.Backup(bak, 0) + if err != nil { + return nil, "", wrapError(err) + } + + // restore db + newDBDir, err := ioutil.TempDir(path.Dir(currDir), "backup") + if err != nil { + return nil, "", wrapError(err) + } + + opts := db.db.Opts().WithDir(newDBDir).WithValueDir(newDBDir) + + // open new db + newDB, err := badger.Open(opts) + if err != nil { + return nil, "", wrapError(err) + } + + bak, err = os.Open(bak.Name()) + if err != nil { + return nil, "", err + } + defer bak.Close() + + err = newDB.Load(bak, 16) + if err != nil { + return nil, "", wrapError(err) + } + + return newDB, backupDir, nil +} + +func (db *Store) cleanup(oldDB *badger.DB, backupDir string) error { + err := oldDB.Close() + if err != nil { + return wrapError(err) + } + + err = os.RemoveAll(oldDB.Opts().Dir) + if err != nil { + return wrapError(err) + } + + return wrapError(os.RemoveAll(backupDir)) +} + // Commit implements the storage.Store interface. func (db *Store) Commit(ctx context.Context, txn storage.Transaction) error { underlying, err := db.underlying(txn) @@ -687,6 +930,12 @@ func toString(path storage.Path) string { // what we have badger write its files to. It is done to give us some // wiggle room in the future should we need to put further files on the // file system (like backups): we can then just use the opts.Dir. -func dataDir(dir string) string { - return filepath.Join(dir, "data") +func dataDir(dir string) (string, error) { + + symlink := filepath.Join(dir, symlinkKey) + if _, err := os.Lstat(symlink); err == nil { + return filepath.EvalSymlinks(symlink) + } + + return filepath.Join(dir, "data"), nil } diff --git a/storage/disk/disk_test.go b/storage/disk/disk_test.go index 03ce3ffdcb..17b06db713 100644 --- a/storage/disk/disk_test.go +++ b/storage/disk/disk_test.go @@ -7,10 +7,18 @@ package disk import ( "bytes" "context" + "encoding/json" + "fmt" + "os" + "path/filepath" "reflect" "strings" "testing" + "github.com/open-policy-agent/opa/internal/file/archive" + + "github.com/open-policy-agent/opa/bundle" + badger "github.com/dgraph-io/badger/v3" "github.com/open-policy-agent/opa/logging" @@ -159,6 +167,201 @@ func TestPolicies(t *testing.T) { }) } +func TestTruncate(t *testing.T) { + test.WithTempFS(map[string]string{}, func(dir string) { + ctx := context.Background() + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: nil}) + if err != nil { + t.Fatal(err) + } + defer s.Close(ctx) + + txn := storage.NewTransactionOrDie(ctx, s, storage.WriteParams) + + var archiveFiles = map[string]string{ + "/a/b/c/data.json": "[1,2,3]", + "/a/b/d/data.json": `e: true`, + "/data.json": `{"x": {"y": true}, "a": {"b": {"z": true}}}}`, + "/a/b/y/data.yaml": `foo: 1`, + "/policy.rego": "package foo\n p = 1", + "/roles/policy.rego": "package bar\n p = 1", + } + + var files [][2]string + for name, content := range archiveFiles { + files = append(files, [2]string{name, content}) + } + + buf := archive.MustWriteTarGz(files) + b, err := bundle.NewReader(buf).WithLazyLoadingMode(true).Read() + if err != nil { + t.Fatal(err) + } + + iterator := bundle.NewIterator(b.Raw) + + err = s.Truncate(ctx, txn, storage.WriteParams, iterator) + if err != nil { + t.Fatalf("Unexpected truncate error: %v", err) + } + + // check if symlink is created + symlink := filepath.Join(dir, symlinkKey) + _, err = os.Lstat(symlink) + if err != nil { + t.Fatal(err) + } + + if err := s.Commit(ctx, txn); err != nil { + t.Fatalf("Unexpected commit error: %v", err) + } + + txn = storage.NewTransactionOrDie(ctx, s) + + actual, err := s.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatal(err) + } + + expected := ` + { + "a": { + "b": { + "c": [1,2,3], + "d": { + "e": true + }, + "y": { + "foo": 1 + }, + "z": true + } + }, + "x": { + "y": true + } + } + ` + jsn := util.MustUnmarshalJSON([]byte(expected)) + + if !reflect.DeepEqual(jsn, actual) { + t.Fatalf("Expected reader's read to be %v but got: %v", jsn, actual) + } + + s.Abort(ctx, txn) + + txn = storage.NewTransactionOrDie(ctx, s) + ids, err := s.ListPolicies(ctx, txn) + if err != nil { + t.Fatal(err) + } + + expectedIds := map[string]struct{}{"/policy.rego": {}, "/roles/policy.rego": {}} + + for _, id := range ids { + if _, ok := expectedIds[id]; !ok { + t.Fatalf("Expected list policies to contain %v but got: %v", expectedIds, id) + } + } + + bs, err := s.GetPolicy(ctx, txn, "/policy.rego") + expectedBytes := []byte("package foo\n p = 1") + if err != nil || !reflect.DeepEqual(expectedBytes, bs) { + t.Fatalf("Expected get policy to return %v but got: %v (err: %v)", expectedBytes, bs, err) + } + + bs, err = s.GetPolicy(ctx, txn, "/roles/policy.rego") + expectedBytes = []byte("package bar\n p = 1") + if err != nil || !reflect.DeepEqual(expectedBytes, bs) { + t.Fatalf("Expected get policy to return %v but got: %v (err: %v)", expectedBytes, bs, err) + } + }) +} + +func TestTruncateMultipleTxn(t *testing.T) { + test.WithTempFS(map[string]string{}, func(dir string) { + ctx := context.Background() + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: nil, Badger: "memtablesize=4000;valuethreshold=600"}) + if err != nil { + t.Fatal(err) + } + defer s.Close(ctx) + + txn := storage.NewTransactionOrDie(ctx, s, storage.WriteParams) + + archiveFiles := map[string]string{} + + for i := 0; i < 20; i++ { + + path := fmt.Sprintf("users/user%d/data.json", i) + + obj := map[string][]byte{} + obj[fmt.Sprintf("key%d", i)] = bytes.Repeat([]byte("a"), 1<<20) // 1 MB. + + bs, err := json.Marshal(obj) + if err != nil { + t.Fatal(err) + } + + archiveFiles[path] = string(bs) + } + + // additional data file at root + archiveFiles["/data.json"] = `{"a": {"b": {"z": true}}}}` + + var files [][2]string + for name, content := range archiveFiles { + files = append(files, [2]string{name, content}) + } + + buf := archive.MustWriteTarGz(files) + b, err := bundle.NewReader(buf).WithLazyLoadingMode(true).Read() + if err != nil { + t.Fatal(err) + } + + iterator := bundle.NewIterator(b.Raw) + + err = s.Truncate(ctx, txn, storage.WriteParams, iterator) + if err != nil { + t.Fatalf("Unexpected truncate error: %v", err) + } + + if err := s.Commit(ctx, txn); err != nil { + t.Fatalf("Unexpected commit error: %v", err) + } + + txn = storage.NewTransactionOrDie(ctx, s) + + _, err = s.Read(ctx, txn, storage.MustParsePath("/users/user19")) + if err != nil { + t.Fatal(err) + } + + s.Abort(ctx, txn) + + txn = storage.NewTransactionOrDie(ctx, s) + + actual, err := s.Read(ctx, txn, storage.MustParsePath("/a")) + if err != nil { + t.Fatal(err) + } + + expected := ` + { + "b": { + "z": true + } + } + ` + jsn := util.MustUnmarshalJSON([]byte(expected)) + + if !reflect.DeepEqual(jsn, actual) { + t.Fatalf("Expected reader's read to be %v but got: %v", jsn, actual) + } + }) +} + func TestDataPartitioningValidation(t *testing.T) { closeFn := func(ctx context.Context, s *Store) { diff --git a/storage/disk/txn.go b/storage/disk/txn.go index f51f74763b..68797e040a 100644 --- a/storage/disk/txn.go +++ b/storage/disk/txn.go @@ -231,6 +231,7 @@ func (txn *transaction) Write(_ context.Context, op storage.PatchOp, path storag if err != nil { return err } + for _, u := range updates { if u.delete { if err := txn.underlying.Delete(u.key); err != nil { @@ -318,11 +319,29 @@ func (txn *transaction) partitionWrite(op storage.PatchOp, path storage.Path, va } func (txn *transaction) partitionWriteMultiple(node *partitionTrie, path storage.Path, value interface{}, result []update) ([]update, error) { - // NOTE(tsandall): value must be an object so that it can be partitioned; in // the future, arrays could be supported but that requires investigation. - obj, ok := value.(map[string]interface{}) - if !ok { + + switch v := value.(type) { + case map[string]interface{}: + bs, err := serialize(v) + if err != nil { + return nil, err + } + return txn.doPartitionWriteMultiple(node, path, bs, result) + case json.RawMessage: + return txn.doPartitionWriteMultiple(node, path, v, result) + case []uint8: + return txn.doPartitionWriteMultiple(node, path, v, result) + } + + return nil, &storage.Error{Code: storage.InvalidPatchErr, Message: "value cannot be partitioned"} +} + +func (txn *transaction) doPartitionWriteMultiple(node *partitionTrie, path storage.Path, bs []byte, result []update) ([]update, error) { + var obj map[string]json.RawMessage + err := util.Unmarshal(bs, &obj) + if err != nil { return nil, &storage.Error{Code: storage.InvalidPatchErr, Message: "value cannot be partitioned"} } @@ -437,6 +456,11 @@ func (txn *transaction) DeletePolicy(_ context.Context, id string) error { } func serialize(value interface{}) ([]byte, error) { + val, ok := value.([]byte) + if ok { + return val, nil + } + bs, err := json.Marshal(value) return bs, wrapError(err) } @@ -451,6 +475,36 @@ func patch(data interface{}, op storage.PatchOp, path storage.Path, idx int, val panic("unreachable") } + val := value + switch v := value.(type) { + case json.RawMessage: + var obj map[string]json.RawMessage + err := util.Unmarshal(v, &obj) + if err == nil { + val = obj + } else { + var obj interface{} + err := util.Unmarshal(v, &obj) + if err != nil { + return nil, err + } + val = obj + } + case []uint8: + var obj map[string]json.RawMessage + err := util.Unmarshal(v, &obj) + if err == nil { + val = obj + } else { + var obj interface{} + err := util.Unmarshal(v, &obj) + if err != nil { + return nil, err + } + val = obj + } + } + // Base case: mutate the data value in-place. if len(path) == idx+1 { // last element switch x := data.(type) { @@ -467,42 +521,43 @@ func patch(data interface{}, op storage.PatchOp, path storage.Path, idx int, val if _, ok := x[key]; !ok { return nil, errors.NewNotFoundError(path) } - x[key] = value + x[key] = val return x, nil case storage.AddOp: - x[key] = value + x[key] = val return x, nil } case []interface{}: switch op { case storage.AddOp: if path[idx] == "-" || path[idx] == strconv.Itoa(len(x)) { - return append(x, value), nil + return append(x, val), nil } i, err := ptr.ValidateArrayIndexForWrite(x, path[idx], idx, path) if err != nil { return nil, err } // insert at i - return append(x[:i], append([]interface{}{value}, x[i:]...)...), nil + return append(x[:i], append([]interface{}{val}, x[i:]...)...), nil case storage.ReplaceOp: i, err := ptr.ValidateArrayIndexForWrite(x, path[idx], idx, path) if err != nil { return nil, err } - x[i] = value + x[i] = val return x, nil case storage.RemoveOp: i, err := ptr.ValidateArrayIndexForWrite(x, path[idx], idx, path) if err != nil { return nil, err + } return append(x[:i], x[i+1:]...), nil // i is skipped default: panic("unreachable") } case nil: // data wasn't set before - return map[string]interface{}{path[idx]: value}, nil + return map[string]interface{}{path[idx]: val}, nil default: return nil, errors.NewNotFoundError(path) } @@ -513,7 +568,7 @@ func patch(data interface{}, op storage.PatchOp, path storage.Path, idx int, val switch x := data.(type) { case map[string]interface{}: - modified, err := patch(x[key], op, path, idx+1, value) + modified, err := patch(x[key], op, path, idx+1, val) if err != nil { return nil, err } @@ -524,7 +579,7 @@ func patch(data interface{}, op storage.PatchOp, path storage.Path, idx int, val if err != nil { return nil, err } - modified, err := patch(x[i], op, path, idx+1, value) + modified, err := patch(x[i], op, path, idx+1, val) if err != nil { return nil, err } @@ -532,7 +587,7 @@ func patch(data interface{}, op storage.PatchOp, path storage.Path, idx int, val return x, nil case nil: // data isn't there yet y := make(map[string]interface{}, 1) - modified, err := patch(nil, op, path, idx+1, value) + modified, err := patch(nil, op, path, idx+1, val) if err != nil { return nil, err } diff --git a/storage/inmem/inmem.go b/storage/inmem/inmem.go index 7ea49e0513..de1d65c489 100644 --- a/storage/inmem/inmem.go +++ b/storage/inmem/inmem.go @@ -92,6 +92,74 @@ func (db *store) NewTransaction(_ context.Context, params ...storage.Transaction return newTransaction(xid, write, context, db), nil } +// Truncate implements the storage.Store interface. This method must be called within a transaction. +func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storage.TransactionParams, it storage.Iterator) error { + var update *storage.Update + var err error + + underlying, err := db.underlying(txn) + if err != nil { + return err + } + + for { + update, err = it.Next() + if err != nil { + break + } + + if update.IsPolicy { + err = underlying.UpsertPolicy(update.Path.String(), update.Value) + if err != nil { + return err + } + } else { + if len(update.Path) > 0 { + var obj interface{} + err = util.Unmarshal(update.Value, &obj) + if err != nil { + return err + } + + err = underlying.Write(storage.AddOp, update.Path, obj) + if err != nil { + return err + } + } else { + // write operation at root path + + var val map[string]interface{} + err := util.Unmarshal(update.Value, &val) + if err != nil { + return invalidPatchError(rootMustBeObjectMsg) + } + + for k := range val { + newPath, ok := storage.ParsePathEscaped("/" + k) + if !ok { + return fmt.Errorf("storage path invalid: %v", newPath) + } + + if err := storage.MakeDir(ctx, db, txn, newPath[:len(newPath)-1]); err != nil { + return err + } + + err = underlying.Write(storage.AddOp, newPath, val[k]) + if err != nil { + return err + } + } + } + } + } + + if err != nil && err != io.EOF { + return err + } + + return nil +} + func (db *store) Commit(ctx context.Context, txn storage.Transaction) error { underlying, err := db.underlying(txn) if err != nil { diff --git a/storage/inmem/inmem_test.go b/storage/inmem/inmem_test.go index 8902d751fa..9dd66cf326 100644 --- a/storage/inmem/inmem_test.go +++ b/storage/inmem/inmem_test.go @@ -11,6 +11,10 @@ import ( "reflect" "testing" + "github.com/open-policy-agent/opa/bundle" + + "github.com/open-policy-agent/opa/internal/file/archive" + "github.com/open-policy-agent/opa/storage/internal/errors" "github.com/open-policy-agent/opa/storage" @@ -323,6 +327,136 @@ func TestInMemoryTxnMultipleWrites(t *testing.T) { } } +func TestTruncate(t *testing.T) { + ctx := context.Background() + store := NewFromObject(map[string]interface{}{}) + txn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + var archiveFiles = map[string]string{ + "/a/b/c/data.json": "[1,2,3]", + "/a/b/d/data.json": "true", + "/data.json": `{"x": {"y": true}, "a": {"b": {"z": true}}}}`, + "/a/b/y/data.yaml": `foo: 1`, + "/policy.rego": "package foo\n p = 1", + "/roles/policy.rego": "package bar\n p = 1", + } + + var files [][2]string + for name, content := range archiveFiles { + files = append(files, [2]string{name, content}) + } + + buf := archive.MustWriteTarGz(files) + b, err := bundle.NewReader(buf).WithLazyLoadingMode(true).Read() + if err != nil { + t.Fatal(err) + } + + iterator := bundle.NewIterator(b.Raw) + + err = store.Truncate(ctx, txn, storage.WriteParams, iterator) + if err != nil { + t.Fatalf("Unexpected truncate error: %v", err) + } + + if err := store.Commit(ctx, txn); err != nil { + t.Fatalf("Unexpected commit error: %v", err) + } + + txn = storage.NewTransactionOrDie(ctx, store) + + actual, err := store.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatal(err) + } + + expected := ` +{ + "a": { + "b": { + "c": [1,2,3], + "d": true, + "y": { + "foo": 1 + }, + "z": true + } + }, + "x": { + "y": true + } +} +` + jsn := util.MustUnmarshalJSON([]byte(expected)) + + if !reflect.DeepEqual(jsn, actual) { + t.Fatalf("Expected reader's read to be %v but got: %v", jsn, actual) + } + + store.Abort(ctx, txn) + + txn = storage.NewTransactionOrDie(ctx, store) + ids, err := store.ListPolicies(ctx, txn) + if err != nil { + t.Fatal(err) + } + + expectedIds := map[string]struct{}{"/policy.rego": {}, "/roles/policy.rego": {}} + + for _, id := range ids { + if _, ok := expectedIds[id]; !ok { + t.Fatalf("Expected list policies to contain %v but got: %v", id, expectedIds) + } + } + + bs, err := store.GetPolicy(ctx, txn, "/policy.rego") + expectedBytes := []byte("package foo\n p = 1") + if err != nil || !reflect.DeepEqual(expectedBytes, bs) { + t.Fatalf("Expected get policy to return %v but got: %v (err: %v)", expectedBytes, bs, err) + } + + bs, err = store.GetPolicy(ctx, txn, "/roles/policy.rego") + expectedBytes = []byte("package bar\n p = 1") + if err != nil || !reflect.DeepEqual(expectedBytes, bs) { + t.Fatalf("Expected get policy to return %v but got: %v (err: %v)", expectedBytes, bs, err) + } +} + +func TestTruncateBadRootWrite(t *testing.T) { + ctx := context.Background() + store := NewFromObject(map[string]interface{}{}) + txn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + var archiveFiles = map[string]string{ + "/a/b/d/data.json": "true", + "/data.json": "[1,2,3]", + "/roles/policy.rego": "package bar\n p = 1", + } + + var files [][2]string + for name, content := range archiveFiles { + files = append(files, [2]string{name, content}) + } + + buf := archive.MustWriteTarGz(files) + b, err := bundle.NewReader(buf).WithLazyLoadingMode(true).Read() + if err != nil { + t.Fatal(err) + } + + iterator := bundle.NewIterator(b.Raw) + + err = store.Truncate(ctx, txn, storage.WriteParams, iterator) + if err == nil { + t.Fatal("Expected truncate error but got nil") + } + + expected := "storage_invalid_patch_error: root must be object" + if err.Error() != expected { + t.Fatalf("Expected error %v but got %v", expected, err.Error()) + } +} + func TestInMemoryTxnWriteFailures(t *testing.T) { ctx := context.Background() diff --git a/storage/interface.go b/storage/interface.go index 38252f32da..20981a78cb 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -34,6 +34,11 @@ type Store interface { // transaction must be automatically aborted by the Store implementation. Commit(context.Context, Transaction) error + // Truncate is called to make a copy of the underlying store, write documents in the new store + // by creating multiple transactions in the new store as needed and finally swapping + // over to the new storage instance. This method must be called within a transaction on the original store. + Truncate(context.Context, Transaction, TransactionParams, Iterator) error + // Abort is called to cancel the transaction. Abort(context.Context, Transaction) } @@ -221,3 +226,16 @@ func (TriggersNotSupported) Register(context.Context, Transaction, TriggerConfig type TriggerHandle interface { Unregister(context.Context, Transaction) } + +// Iterator defines the interface that can be used to read files from a directory starting with +// files at the base of the directory, then sub-directories etc. +type Iterator interface { + Next() (*Update, error) +} + +// Update contains information about a file +type Update struct { + Path Path + Value []byte + IsPolicy bool +} diff --git a/topdown/topdown_test.go b/topdown/topdown_test.go index 99e3cff74b..c918bb5027 100644 --- a/topdown/topdown_test.go +++ b/topdown/topdown_test.go @@ -769,6 +769,10 @@ func (*contextPropagationStore) Commit(context.Context, storage.Transaction) err func (*contextPropagationStore) Abort(context.Context, storage.Transaction) { } +func (*contextPropagationStore) Truncate(context.Context, storage.Transaction, storage.TransactionParams, storage.Iterator) error { + return nil +} + func (m *contextPropagationStore) Read(ctx context.Context, txn storage.Transaction, path storage.Path) (interface{}, error) { val := ctx.Value(contextPropagationMock{}) m.calls = append(m.calls, val) @@ -825,6 +829,10 @@ func (*astStore) Commit(context.Context, storage.Transaction) error { func (*astStore) Abort(context.Context, storage.Transaction) {} +func (*astStore) Truncate(context.Context, storage.Transaction, storage.TransactionParams, storage.Iterator) error { + return nil +} + func (a *astStore) Read(ctx context.Context, txn storage.Transaction, path storage.Path) (interface{}, error) { if path.String() == a.path { return a.value, nil