Skip to content

Commit

Permalink
Merge branch 'main' into adityahegde/ui-based-deploy
Browse files Browse the repository at this point in the history
  • Loading branch information
AdityaHegde committed Jul 3, 2024
2 parents 25c11eb + 3e9cd10 commit c74f493
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 171 deletions.
1 change: 1 addition & 0 deletions admin/worker/delete_unsued_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (w *Worker) deleteUnusedAssets(ctx context.Context) error {
group.SetLimit(8)
var ids []string
for _, asset := range assets {
asset := asset
ids = append(ids, asset.ID)
group.Go(func() error {
parsed, err := url.Parse(asset.Path)
Expand Down
12 changes: 6 additions & 6 deletions docs/docs/build/connect/glob-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ To ingest data using glob patterns, you include the pattern in the URI of the so
gs://my-bucket/y=2023/m=01/*.parquet
`

By default, Rill applies certain limits when using glob patterns to ingest data. The default limits are as follows:
- **Total size of all matching files**: 10GB
- **Total file matches**: 1000
- **Total files listed**: 1 million
By default, Rill can apply certain limits when using glob patterns to ingest data. The default limits are as follows:
- **Total size of all matching files**: 100GB
- **Total file matches**: unlimited
- **Total files listed**: unlimited

These limits can be configured in the `.yaml` file for the source. To modify the default limits, you can update the `.yaml` file with following fields:
- `glob.max_total_size`: The maximum total size (in bytes) of all objects.
- `glob.max_objects_matched`: The total file matches allowed.
- `glob.max_objects_listed`: The total files listed to match against the glob pattern.

For example, to increase the limit on the total bytes downloaded to 100GB, you would add the following line to the `source.yaml` file:
For example, to set the limit on the total bytes downloaded to 1GB, you would add the following line to the `source.yaml` file:
```yaml
glob.max_total_size: 1073741824000
glob.max_total_size: 1073741824
```
## Extract policies
Expand Down
6 changes: 3 additions & 3 deletions docs/docs/reference/project-files/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ Files that are *nested at any level* under your native `sources` directory will

**`glob.max_total_size`**
— Applicable if the URI is a glob pattern. The max allowed total size (in bytes) of all objects matching the glob pattern _(optional)_.
- Default value is _`10737418240 (10GB)`_
- Default value is _`107374182400 (100GB)`_

**`glob.max_objects_matched`**
— Applicable if the URI is a glob pattern. The max allowed number of objects matching the glob pattern _(optional)_.
- Default value is _`1,000`_
- Default value is _`unlimited`_

**`glob.max_objects_listed`**
— Applicable if the URI is a glob pattern. The max number of objects to list and match against glob pattern, not inclusive of files already excluded by the glob prefix _(optional)_.
- Default value is _`1,000,000`_
- Default value is _`unlimited`_

**`timeout`**
— The maximum time to wait for souce ingestion _(optional)_.
Expand Down
9 changes: 5 additions & 4 deletions runtime/drivers/blob/blobdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -62,14 +63,14 @@ type Options struct {
// sets defaults if not set by user
func (opts *Options) validate() {
if opts.GlobMaxObjectsMatched == 0 {
opts.GlobMaxObjectsMatched = 1000
opts.GlobMaxObjectsMatched = math.MaxInt
}
if opts.GlobMaxObjectsListed == 0 {
opts.GlobMaxObjectsListed = 1000 * 1000
opts.GlobMaxObjectsListed = math.MaxInt64
}
if opts.GlobMaxTotalSize == 0 {
// 10 GB
opts.GlobMaxTotalSize = 10 * 1024 * 1024 * 1024
// 100 GB
opts.GlobMaxTotalSize = 100 * 1024 * 1024 * 1024
}
if opts.GlobPageSize == 0 {
opts.GlobPageSize = 1000
Expand Down
2 changes: 1 addition & 1 deletion runtime/drivers/blob/blobdownloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var filesData = map[string][]byte{
"2020/02/04/data.txt": []byte("test"),
}

const TenGB = 10 * 1024 * 1024
const TenGB = 10 * 1024 * 1024 * 1024

func TestFetchFileNames(t *testing.T) {
type args struct {
Expand Down
48 changes: 48 additions & 0 deletions runtime/drivers/duckdb/model_executor_self_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package duckdb
import (
"context"
"fmt"
"os"
"sync/atomic"
"time"

"github.com/c2h5oh/datasize"
"github.com/mitchellh/mapstructure"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/drivers/file"
Expand Down Expand Up @@ -47,15 +51,59 @@ func (e *selfToFileExecutor) Execute(ctx context.Context) (*drivers.ModelResult,
return nil, err
}

// Check the output file size does not exceed the configured limit.
overLimit := atomic.Bool{}
if outputProps.FileSizeLimitBytes > 0 {
var cancel context.CancelFunc
// override the parent context
ctx, cancel = context.WithCancel(ctx)
defer cancel()
go func() {
ticker := time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
f, err := os.Stat(outputProps.Path)
if err != nil { // ignore error since file may not be created yet
continue
}
if f.Size() > outputProps.FileSizeLimitBytes {
overLimit.Store(true)
cancel()
}
}
}
}()
}

err = olap.Exec(ctx, &drivers.Statement{
Query: sql,
Args: inputProps.Args,
Priority: e.opts.Priority,
})
if err != nil {
if overLimit.Load() {
return nil, fmt.Errorf("file exceeds size limit %q", datasize.ByteSize(outputProps.FileSizeLimitBytes).HumanReadable())
}
return nil, fmt.Errorf("failed to execute query: %w", err)
}

// check the size again since duckdb writes data with high throughput
// and it is possible that the entire file is written
// before we check size in background goroutine
if outputProps.FileSizeLimitBytes > 0 {
f, err := os.Stat(outputProps.Path)
if err != nil {
return nil, err
}
if f.Size() > outputProps.FileSizeLimitBytes {
return nil, fmt.Errorf("file exceeds size limit %q", datasize.ByteSize(outputProps.FileSizeLimitBytes).HumanReadable())
}
}

// Build result props
resultProps := &file.ModelResultProperties{
Path: outputProps.Path,
Expand Down
5 changes: 2 additions & 3 deletions runtime/drivers/duckdb/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,10 +744,9 @@ func (c *connection) execWithLimits(parentCtx context.Context, stmt *drivers.Sta
}

// check current size
currentSize, _ := c.EstimateSize()
storageLimit -= currentSize
sz, _ := c.EstimateSize()
// current size already exceeds limit
if storageLimit <= 0 {
if sz >= storageLimit {
return drivers.ErrStorageLimitExceeded
}

Expand Down
5 changes: 3 additions & 2 deletions runtime/drivers/file/model_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
)

type ModelOutputProperties struct {
Path string `mapstructure:"path"`
Format drivers.FileFormat `mapstructure:"format"`
Path string `mapstructure:"path"`
Format drivers.FileFormat `mapstructure:"format"`
FileSizeLimitBytes int64 `mapstructure:"file_size_limit_bytes"`
}

func (p *ModelOutputProperties) Validate() error {
Expand Down
104 changes: 72 additions & 32 deletions runtime/drivers/file/model_executor_olap_self.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,24 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"time"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/apache/arrow/go/v14/parquet/pqarrow"
"github.com/c2h5oh/datasize"
"github.com/mitchellh/mapstructure"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/jsonval"
"github.com/xuri/excelize/v2"
)

const maxParquetRowGroupSize = 512 * int64(datasize.MB)

type olapToSelfExecutor struct {
c *connection
olap drivers.OLAPStore
Expand Down Expand Up @@ -61,19 +65,32 @@ func (e *olapToSelfExecutor) Execute(ctx context.Context) (*drivers.ModelResult,
}
defer res.Close()

f, err := os.Create(outputProps.Path)
if err != nil {
return nil, err
}
defer f.Close()
var fw io.Writer = f
if outputProps.FileSizeLimitBytes > 0 {
fw = &limitedWriter{W: fw, N: outputProps.FileSizeLimitBytes}
}

switch outputProps.Format {
case drivers.FileFormatParquet:
err = writeParquet(res, outputProps.Path)
err = writeParquet(res, fw)
case drivers.FileFormatCSV:
err = writeCSV(res, outputProps.Path)
err = writeCSV(res, fw)
case drivers.FileFormatJSON:
return nil, errors.New("json file output not currently supported")
case drivers.FileFormatXLSX:
err = writeXLSX(res, outputProps.Path)
err = writeXLSX(res, fw)
default:
return nil, fmt.Errorf("unsupported output format %q", outputProps.Format)
}
if err != nil {
if errors.Is(err, io.ErrShortWrite) {
return nil, fmt.Errorf("file exceeds size limit %q", datasize.ByteSize(outputProps.FileSizeLimitBytes).HumanReadable())
}
return nil, fmt.Errorf("failed to write format %q: %w", outputProps.Format, err)
}

Expand All @@ -93,20 +110,14 @@ func (e *olapToSelfExecutor) Execute(ctx context.Context) (*drivers.ModelResult,
}, nil
}

func writeCSV(res *drivers.Result, path string) error {
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()

w := csv.NewWriter(f)
func writeCSV(res *drivers.Result, fw io.Writer) error {
w := csv.NewWriter(fw)

strs := make([]string, len(res.Schema.Fields))
for i, f := range res.Schema.Fields {
strs[i] = f.Name
}
err = w.Write(strs)
err := w.Write(strs)
if err != nil {
return err
}
Expand Down Expand Up @@ -155,7 +166,7 @@ func writeCSV(res *drivers.Result, path string) error {
return nil
}

func writeXLSX(res *drivers.Result, path string) error {
func writeXLSX(res *drivers.Result, fw io.Writer) error {
xf := excelize.NewFile()
defer func() { _ = xf.Close() }()

Expand Down Expand Up @@ -222,20 +233,14 @@ func writeXLSX(res *drivers.Result, path string) error {
return err
}

f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()

err = xf.Write(f)
err = xf.Write(fw)
if err != nil {
return err
}
return nil
}

func writeParquet(res *drivers.Result, path string) error {
func writeParquet(res *drivers.Result, fw io.Writer) error {
fields := make([]arrow.Field, 0, len(res.Schema.Fields))
for _, f := range res.Schema.Fields {
arrowField := arrow.Field{}
Expand Down Expand Up @@ -274,6 +279,12 @@ func writeParquet(res *drivers.Result, path string) error {
vals[i] = new(any)
}

parquetwriter, err := pqarrow.NewFileWriter(schema, fw, nil, pqarrow.ArrowWriterProperties{})
if err != nil {
return err
}
defer parquetwriter.Close()
var rows int64
for res.Next() {
err := res.Scan(vals...)
if err != nil {
Expand Down Expand Up @@ -328,22 +339,51 @@ func writeParquet(res *drivers.Result, path string) error {
recordBuilder.Field(i).(*array.BinaryBuilder).Append(v)
}
}
rows++
if rows == 1000 {
rec := recordBuilder.NewRecord()
if err := parquetwriter.WriteBuffered(rec); err != nil {
rec.Release()
return err
}
rec.Release()
if parquetwriter.RowGroupTotalBytesWritten() >= maxParquetRowGroupSize {
// Also flushes the data to the disk freeing memory
parquetwriter.NewBufferedRowGroup()
}
rows = 0
}
}
if res.Err() != nil {
return res.Err()
}

f, err := os.Create(path)
if err != nil {
return err
if rows == 0 {
return nil
}
defer f.Close()
rec := recordBuilder.NewRecord()
err = parquetwriter.Write(rec)
// release the record before returning the error
rec.Release()
return err
}

parquetwriter, err := pqarrow.NewFileWriter(schema, f, nil, pqarrow.ArrowWriterProperties{})
if err != nil {
return err
}
defer parquetwriter.Close()
// A limitedWriter writes to W but limits the amount of
// data written to just N bytes.
//
// Modified from github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/util/ioutils/ioutils.go
type limitedWriter struct {
W io.Writer // underlying writer
N int64 // max bytes remaining
}

return parquetwriter.Write(recordBuilder.NewRecord())
func (l *limitedWriter) Write(p []byte) (n int, err error) {
if l.N <= 0 {
return 0, io.ErrShortWrite
}
if int64(len(p)) > l.N {
return 0, io.ErrShortWrite
}
n, err = l.W.Write(p)
l.N -= int64(n)
return
}
Loading

0 comments on commit c74f493

Please sign in to comment.