Skip to content

Commit

Permalink
Initial support for large bundle deployments
Browse files Browse the repository at this point in the history
Currently bundles are loaded into memory entirely
even when disk storage is used. Then the parsed content
is written to the store. Deserializing data into Go structs
is memory consuming and even if user has configured disk
storage, OPA is still bound by the amount of memory
assigned to it. This change adds a new lazy loading mode
wherein the entire data is not deserialized while bundle
reading and hence if the bundle contains large data files
and the user has enabled disk storage, OPA should be
able to handle this scenario w/o running OOM.

Fixes: #4539

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar committed Jun 27, 2022
1 parent 9413dff commit f60dfaf
Show file tree
Hide file tree
Showing 22 changed files with 2,834 additions and 174 deletions.
70 changes: 64 additions & 6 deletions bundle/bundle.go
Expand Up @@ -54,6 +54,16 @@ type Bundle struct {
PlanModules []PlanModuleFile
Patch Patch
Etag string
Raw []Raw

lazyLoadingMode bool
sizeLimitBytes int64
}

// Raw contains raw bytes representing the bundle's content
type Raw struct {
Path string
Value []byte
}

// Patch contains an array of objects wherein each object represents the patch operation to be
Expand Down Expand Up @@ -286,6 +296,10 @@ func (m *Manifest) validateAndInjectDefaults(b Bundle) error {
}
}

if b.lazyLoadingMode {
return nil
}

// Validate data in bundle.
return dfs(b.Data, "", func(path string, node interface{}) (bool, error) {
path = strings.Trim(path, "/")
Expand Down Expand Up @@ -343,6 +357,8 @@ type Reader struct {
files map[string]FileInfo // files in the bundle signature payload
sizeLimitBytes int64
etag string
lazyLoadingMode bool
name string
}

// NewReader is deprecated. Use NewCustomReader instead.
Expand Down Expand Up @@ -413,18 +429,37 @@ func (r *Reader) WithBundleEtag(etag string) *Reader {
return r
}

// WithBundleName specifies the bundle name
func (r *Reader) WithBundleName(name string) *Reader {
r.name = name
return r
}

// WithLazyLoadingMode sets the bundle loading mode. If true,
// bundles will be read in lazy mode. In this mode, data files in the bundle will not be
// deserialized and the check to validate that the bundle data does not contain paths
// outside the bundle's roots will not be performed while reading the bundle.
func (r *Reader) WithLazyLoadingMode(yes bool) *Reader {
r.lazyLoadingMode = yes
return r
}

// Read returns a new Bundle loaded from the reader.
func (r *Reader) Read() (Bundle, error) {

var bundle Bundle
var descriptors []*Descriptor
var err error
var raw []Raw

bundle.Signatures, bundle.Patch, descriptors, err = preProcessBundle(r.loader, r.skipVerify, r.sizeLimitBytes)
if err != nil {
return bundle, err
}

bundle.lazyLoadingMode = r.lazyLoadingMode
bundle.sizeLimitBytes = r.sizeLimitBytes

if bundle.Type() == SnapshotBundleType {
err = r.checkSignaturesAndDescriptors(bundle.Signatures)
if err != nil {
Expand Down Expand Up @@ -463,6 +498,17 @@ func (r *Reader) Read() (Bundle, error) {

if strings.HasSuffix(path, RegoExt) {
fullPath := r.fullPath(path)
bs := buf.Bytes()

if r.lazyLoadingMode {
p := fullPath
if r.name != "" {
p = modulePathWithPrefix(r.name, fullPath)
}

raw = append(raw, Raw{Path: p, Value: bs})
}

r.metrics.Timer(metrics.RegoModuleParse).Start()
module, err := ast.ParseModuleWithOpts(fullPath, buf.String(), ast.ParserOptions{ProcessAnnotation: r.processAnnotations})
r.metrics.Timer(metrics.RegoModuleParse).Stop()
Expand All @@ -473,11 +519,10 @@ func (r *Reader) Read() (Bundle, error) {
mf := ModuleFile{
URL: f.URL(),
Path: fullPath,
Raw: buf.Bytes(),
Raw: bs,
Parsed: module,
}
bundle.Modules = append(bundle.Modules, mf)

} else if filepath.Base(path) == WasmFile {
bundle.WasmModules = append(bundle.WasmModules, WasmModuleFile{
URL: f.URL(),
Expand All @@ -491,6 +536,11 @@ func (r *Reader) Read() (Bundle, error) {
Raw: buf.Bytes(),
})
} else if filepath.Base(path) == dataFile {
if r.lazyLoadingMode {
raw = append(raw, Raw{Path: path, Value: buf.Bytes()})
continue
}

var value interface{}

r.metrics.Timer(metrics.RegoDataParse).Start()
Expand All @@ -506,6 +556,10 @@ func (r *Reader) Read() (Bundle, error) {
}

} else if filepath.Base(path) == yamlDataFile {
if r.lazyLoadingMode {
raw = append(raw, Raw{Path: path, Value: buf.Bytes()})
continue
}

var value interface{}

Expand Down Expand Up @@ -592,6 +646,7 @@ func (r *Reader) Read() (Bundle, error) {
}

bundle.Etag = r.etag
bundle.Raw = raw

return bundle, nil
}
Expand Down Expand Up @@ -1200,7 +1255,13 @@ func rootContains(root []string, other []string) bool {
}

func insertValue(b *Bundle, path string, value interface{}) error {
if err := b.insertData(getNormalizedPath(path), value); err != nil {
return fmt.Errorf("bundle load failed on %v: %w", path, err)
}
return nil
}

func getNormalizedPath(path string) []string {
// Remove leading / and . characters from the directory path. If the bundle
// was written with OPA then the paths will contain a leading slash. On the
// other hand, if the path is empty, filepath.Dir will return '.'.
Expand All @@ -1211,10 +1272,7 @@ func insertValue(b *Bundle, path string, value interface{}) error {
if dirpath != "" {
key = strings.Split(dirpath, "/")
}
if err := b.insertData(key, value); err != nil {
return fmt.Errorf("bundle load failed on %v: %w", path, err)
}
return nil
return key
}

func dfs(value interface{}, path string, fn func(string, interface{}) (bool, error)) error {
Expand Down
28 changes: 28 additions & 0 deletions bundle/bundle_test.go
Expand Up @@ -118,6 +118,34 @@ func TestReadWithSizeLimit(t *testing.T) {
}
}

func TestReadBundleInLazyMode(t *testing.T) {
files := [][2]string{
{"/a/b/c/data.json", "[1,2,3]"},
{"/a/b/d/data.json", "true"},
{"/a/b/y/data.yaml", `foo: 1`},
{"/example/example.rego", `package example`},
{"/data.json", `{"x": {"y": true}, "a": {"b": {"z": true}}}}`},
{"/.manifest", `{"revision": "foo", "roots": ["example"]}`}, // data is outside roots but validation skipped in lazy mode
}

buf := archive.MustWriteTarGz(files)
loader := NewTarballLoaderWithBaseURL(buf, "")
br := NewCustomReader(loader).WithLazyLoadingMode(true)

bundle, err := br.Read()
if err != nil {
t.Fatal(err)
}

if len(bundle.Data) != 0 {
t.Fatal("expected the bundle object to contain no data")
}

if len(bundle.Raw) == 0 {
t.Fatal("raw bundle bytes not set on bundle object")
}
}

func TestReadWithBundleEtag(t *testing.T) {

files := [][2]string{
Expand Down
75 changes: 75 additions & 0 deletions bundle/file.go
Expand Up @@ -9,8 +9,11 @@ import (
"os"
"path"
"path/filepath"
"sort"
"strings"
"sync"

"github.com/open-policy-agent/opa/storage"
)

// Descriptor contains information about a file and
Expand Down Expand Up @@ -192,6 +195,8 @@ type tarballLoader struct {
type file struct {
name string
reader io.Reader
path storage.Path
raw []byte
}

// NewTarballLoader is deprecated. Use NewTarballLoaderWithBaseURL instead.
Expand Down Expand Up @@ -265,3 +270,73 @@ func (t *tarballLoader) NextFile() (*Descriptor, error) {

return newDescriptor(path.Join(t.baseURL, f.name), f.name, f.reader), nil
}

// Next implements the storage.Iterator interface.
// It iterates to the next policy or data file in the directory tree
// and returns a storage.Update for the file.
func (it *iterator) Next() (*storage.Update, error) {

if it.files == nil {
it.files = []file{}

for _, item := range it.raw {
f := file{name: item.Path}

fpath := strings.TrimLeft(filepath.ToSlash(filepath.Dir(f.name)), "/.")
if strings.HasSuffix(f.name, RegoExt) {
fpath = strings.Trim(f.name, "/")
}

p, ok := storage.ParsePathEscaped("/" + fpath)
if !ok {
return nil, fmt.Errorf("storage path invalid: %v", f.name)
}
f.path = p

f.raw = item.Value

it.files = append(it.files, f)
}

sortFilePathAscend(it.files)
}

// If done reading files then just return io.EOF
// errors for each NextFile() call
if it.idx >= len(it.files) {
return nil, io.EOF
}

f := it.files[it.idx]
it.idx++

isPolicy := false
if strings.HasSuffix(f.name, RegoExt) {
isPolicy = true
}

return &storage.Update{
Path: f.path,
Value: f.raw,
IsPolicy: isPolicy,
}, nil
}

type iterator struct {
raw []Raw
files []file
idx int
}

func NewIterator(raw []Raw) storage.Iterator {
it := iterator{
raw: raw,
}
return &it
}

func sortFilePathAscend(files []file) {
sort.Slice(files, func(i, j int) bool {
return len(files[i].path) < len(files[j].path)
})
}

0 comments on commit f60dfaf

Please sign in to comment.