From 50aefc5347ac3adf1cb813c7804d1f223c53ab7d Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Tue, 2 Jul 2024 17:11:35 +0530 Subject: [PATCH 1/5] Runtime: Modify blob limits (#5178) * modify blob limits * also update docs --- docs/docs/build/connect/glob-patterns.md | 12 ++++++------ docs/docs/reference/project-files/sources.md | 6 +++--- runtime/drivers/blob/blobdownloader.go | 9 +++++---- runtime/drivers/blob/blobdownloader_test.go | 2 +- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/docs/docs/build/connect/glob-patterns.md b/docs/docs/build/connect/glob-patterns.md index 3d2a712d9d6..75aa0871857 100644 --- a/docs/docs/build/connect/glob-patterns.md +++ b/docs/docs/build/connect/glob-patterns.md @@ -14,19 +14,19 @@ To ingest data using glob patterns, you include the pattern in the URI of the so gs://my-bucket/y=2023/m=01/*.parquet ` -By default, Rill applies certain limits when using glob patterns to ingest data. The default limits are as follows: -- **Total size of all matching files**: 10GB -- **Total file matches**: 1000 -- **Total files listed**: 1 million +By default, Rill can apply certain limits when using glob patterns to ingest data. The default limits are as follows: +- **Total size of all matching files**: 100GB +- **Total file matches**: unlimited +- **Total files listed**: unlimited These limits can be configured in the `.yaml` file for the source. To modify the default limits, you can update the `.yaml` file with following fields: - `glob.max_total_size`: The maximum total size (in bytes) of all objects. - `glob.max_objects_matched`: The total file matches allowed. - `glob.max_objects_listed`: The total files listed to match against the glob pattern. -For example, to increase the limit on the total bytes downloaded to 100GB, you would add the following line to the `source.yaml` file: +For example, to set the limit on the total bytes downloaded to 1GB, you would add the following line to the `source.yaml` file: ```yaml -glob.max_total_size: 1073741824000 +glob.max_total_size: 1073741824 ``` ## Extract policies diff --git a/docs/docs/reference/project-files/sources.md b/docs/docs/reference/project-files/sources.md index 60d0a64910f..fc1d64f95cb 100644 --- a/docs/docs/reference/project-files/sources.md +++ b/docs/docs/reference/project-files/sources.md @@ -66,15 +66,15 @@ Files that are *nested at any level* under your native `sources` directory will **`glob.max_total_size`** — Applicable if the URI is a glob pattern. The max allowed total size (in bytes) of all objects matching the glob pattern _(optional)_. - - Default value is _`10737418240 (10GB)`_ + - Default value is _`107374182400 (100GB)`_ **`glob.max_objects_matched`** — Applicable if the URI is a glob pattern. The max allowed number of objects matching the glob pattern _(optional)_. - - Default value is _`1,000`_ + - Default value is _`unlimited`_ **`glob.max_objects_listed`** — Applicable if the URI is a glob pattern. The max number of objects to list and match against glob pattern, not inclusive of files already excluded by the glob prefix _(optional)_. - - Default value is _`1,000,000`_ + - Default value is _`unlimited`_ **`timeout`** — The maximum time to wait for souce ingestion _(optional)_. diff --git a/runtime/drivers/blob/blobdownloader.go b/runtime/drivers/blob/blobdownloader.go index bccb8aee54c..d1a4c63f931 100644 --- a/runtime/drivers/blob/blobdownloader.go +++ b/runtime/drivers/blob/blobdownloader.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "os" "path/filepath" "strings" @@ -62,14 +63,14 @@ type Options struct { // sets defaults if not set by user func (opts *Options) validate() { if opts.GlobMaxObjectsMatched == 0 { - opts.GlobMaxObjectsMatched = 1000 + opts.GlobMaxObjectsMatched = math.MaxInt } if opts.GlobMaxObjectsListed == 0 { - opts.GlobMaxObjectsListed = 1000 * 1000 + opts.GlobMaxObjectsListed = math.MaxInt64 } if opts.GlobMaxTotalSize == 0 { - // 10 GB - opts.GlobMaxTotalSize = 10 * 1024 * 1024 * 1024 + // 100 GB + opts.GlobMaxTotalSize = 100 * 1024 * 1024 * 1024 } if opts.GlobPageSize == 0 { opts.GlobPageSize = 1000 diff --git a/runtime/drivers/blob/blobdownloader_test.go b/runtime/drivers/blob/blobdownloader_test.go index 66d37eedf65..ad980bcc3f5 100644 --- a/runtime/drivers/blob/blobdownloader_test.go +++ b/runtime/drivers/blob/blobdownloader_test.go @@ -21,7 +21,7 @@ var filesData = map[string][]byte{ "2020/02/04/data.txt": []byte("test"), } -const TenGB = 10 * 1024 * 1024 +const TenGB = 10 * 1024 * 1024 * 1024 func TestFetchFileNames(t *testing.T) { type args struct { From 76a65f4a3c35f7941c4afbd91ac61c32cf67e547 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Tue, 2 Jul 2024 17:16:36 +0530 Subject: [PATCH 2/5] Runtime: Better export limits (#5169) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * export limits * Update runtime/drivers/duckdb/model_executor_self_file.go Co-authored-by: Benjamin Egelund-Müller * interim review - 1 * use byte size for parquet exports * remove unused fields * use data_dir in tests --------- Co-authored-by: Benjamin Egelund-Müller --- .../duckdb/model_executor_self_file.go | 48 ++++++++ runtime/drivers/duckdb/olap.go | 5 +- runtime/drivers/file/model_executor.go | 5 +- .../drivers/file/model_executor_olap_self.go | 104 ++++++++++++------ runtime/drivers/registry.go | 7 +- runtime/metricsview/executor.go | 9 +- runtime/metricsview/executor_export.go | 14 +-- runtime/metricsview/executor_rewrite_limit.go | 12 +- runtime/server/downloads.go | 33 +----- runtime/testruntime/testruntime.go | 1 + 10 files changed, 144 insertions(+), 94 deletions(-) diff --git a/runtime/drivers/duckdb/model_executor_self_file.go b/runtime/drivers/duckdb/model_executor_self_file.go index 78fd2886459..92c0a8d6030 100644 --- a/runtime/drivers/duckdb/model_executor_self_file.go +++ b/runtime/drivers/duckdb/model_executor_self_file.go @@ -3,7 +3,11 @@ package duckdb import ( "context" "fmt" + "os" + "sync/atomic" + "time" + "github.com/c2h5oh/datasize" "github.com/mitchellh/mapstructure" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/drivers/file" @@ -47,15 +51,59 @@ func (e *selfToFileExecutor) Execute(ctx context.Context) (*drivers.ModelResult, return nil, err } + // Check the output file size does not exceed the configured limit. + overLimit := atomic.Bool{} + if outputProps.FileSizeLimitBytes > 0 { + var cancel context.CancelFunc + // override the parent context + ctx, cancel = context.WithCancel(ctx) + defer cancel() + go func() { + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + f, err := os.Stat(outputProps.Path) + if err != nil { // ignore error since file may not be created yet + continue + } + if f.Size() > outputProps.FileSizeLimitBytes { + overLimit.Store(true) + cancel() + } + } + } + }() + } + err = olap.Exec(ctx, &drivers.Statement{ Query: sql, Args: inputProps.Args, Priority: e.opts.Priority, }) if err != nil { + if overLimit.Load() { + return nil, fmt.Errorf("file exceeds size limit %q", datasize.ByteSize(outputProps.FileSizeLimitBytes).HumanReadable()) + } return nil, fmt.Errorf("failed to execute query: %w", err) } + // check the size again since duckdb writes data with high throughput + // and it is possible that the entire file is written + // before we check size in background goroutine + if outputProps.FileSizeLimitBytes > 0 { + f, err := os.Stat(outputProps.Path) + if err != nil { + return nil, err + } + if f.Size() > outputProps.FileSizeLimitBytes { + return nil, fmt.Errorf("file exceeds size limit %q", datasize.ByteSize(outputProps.FileSizeLimitBytes).HumanReadable()) + } + } + // Build result props resultProps := &file.ModelResultProperties{ Path: outputProps.Path, diff --git a/runtime/drivers/duckdb/olap.go b/runtime/drivers/duckdb/olap.go index f4bfd2362de..81a08740f44 100644 --- a/runtime/drivers/duckdb/olap.go +++ b/runtime/drivers/duckdb/olap.go @@ -744,10 +744,9 @@ func (c *connection) execWithLimits(parentCtx context.Context, stmt *drivers.Sta } // check current size - currentSize, _ := c.EstimateSize() - storageLimit -= currentSize + sz, _ := c.EstimateSize() // current size already exceeds limit - if storageLimit <= 0 { + if sz >= storageLimit { return drivers.ErrStorageLimitExceeded } diff --git a/runtime/drivers/file/model_executor.go b/runtime/drivers/file/model_executor.go index 63fff050d58..568fc329133 100644 --- a/runtime/drivers/file/model_executor.go +++ b/runtime/drivers/file/model_executor.go @@ -7,8 +7,9 @@ import ( ) type ModelOutputProperties struct { - Path string `mapstructure:"path"` - Format drivers.FileFormat `mapstructure:"format"` + Path string `mapstructure:"path"` + Format drivers.FileFormat `mapstructure:"format"` + FileSizeLimitBytes int64 `mapstructure:"file_size_limit_bytes"` } func (p *ModelOutputProperties) Validate() error { diff --git a/runtime/drivers/file/model_executor_olap_self.go b/runtime/drivers/file/model_executor_olap_self.go index fe3aeceb0ee..bb394e27e24 100644 --- a/runtime/drivers/file/model_executor_olap_self.go +++ b/runtime/drivers/file/model_executor_olap_self.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "os" "time" @@ -13,6 +14,7 @@ import ( "github.com/apache/arrow/go/v14/arrow/array" "github.com/apache/arrow/go/v14/arrow/memory" "github.com/apache/arrow/go/v14/parquet/pqarrow" + "github.com/c2h5oh/datasize" "github.com/mitchellh/mapstructure" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/drivers" @@ -20,6 +22,8 @@ import ( "github.com/xuri/excelize/v2" ) +const maxParquetRowGroupSize = 512 * int64(datasize.MB) + type olapToSelfExecutor struct { c *connection olap drivers.OLAPStore @@ -61,19 +65,32 @@ func (e *olapToSelfExecutor) Execute(ctx context.Context) (*drivers.ModelResult, } defer res.Close() + f, err := os.Create(outputProps.Path) + if err != nil { + return nil, err + } + defer f.Close() + var fw io.Writer = f + if outputProps.FileSizeLimitBytes > 0 { + fw = &limitedWriter{W: fw, N: outputProps.FileSizeLimitBytes} + } + switch outputProps.Format { case drivers.FileFormatParquet: - err = writeParquet(res, outputProps.Path) + err = writeParquet(res, fw) case drivers.FileFormatCSV: - err = writeCSV(res, outputProps.Path) + err = writeCSV(res, fw) case drivers.FileFormatJSON: return nil, errors.New("json file output not currently supported") case drivers.FileFormatXLSX: - err = writeXLSX(res, outputProps.Path) + err = writeXLSX(res, fw) default: return nil, fmt.Errorf("unsupported output format %q", outputProps.Format) } if err != nil { + if errors.Is(err, io.ErrShortWrite) { + return nil, fmt.Errorf("file exceeds size limit %q", datasize.ByteSize(outputProps.FileSizeLimitBytes).HumanReadable()) + } return nil, fmt.Errorf("failed to write format %q: %w", outputProps.Format, err) } @@ -93,20 +110,14 @@ func (e *olapToSelfExecutor) Execute(ctx context.Context) (*drivers.ModelResult, }, nil } -func writeCSV(res *drivers.Result, path string) error { - f, err := os.Create(path) - if err != nil { - return err - } - defer f.Close() - - w := csv.NewWriter(f) +func writeCSV(res *drivers.Result, fw io.Writer) error { + w := csv.NewWriter(fw) strs := make([]string, len(res.Schema.Fields)) for i, f := range res.Schema.Fields { strs[i] = f.Name } - err = w.Write(strs) + err := w.Write(strs) if err != nil { return err } @@ -155,7 +166,7 @@ func writeCSV(res *drivers.Result, path string) error { return nil } -func writeXLSX(res *drivers.Result, path string) error { +func writeXLSX(res *drivers.Result, fw io.Writer) error { xf := excelize.NewFile() defer func() { _ = xf.Close() }() @@ -222,20 +233,14 @@ func writeXLSX(res *drivers.Result, path string) error { return err } - f, err := os.Create(path) - if err != nil { - return err - } - defer f.Close() - - err = xf.Write(f) + err = xf.Write(fw) if err != nil { return err } return nil } -func writeParquet(res *drivers.Result, path string) error { +func writeParquet(res *drivers.Result, fw io.Writer) error { fields := make([]arrow.Field, 0, len(res.Schema.Fields)) for _, f := range res.Schema.Fields { arrowField := arrow.Field{} @@ -274,6 +279,12 @@ func writeParquet(res *drivers.Result, path string) error { vals[i] = new(any) } + parquetwriter, err := pqarrow.NewFileWriter(schema, fw, nil, pqarrow.ArrowWriterProperties{}) + if err != nil { + return err + } + defer parquetwriter.Close() + var rows int64 for res.Next() { err := res.Scan(vals...) if err != nil { @@ -328,22 +339,51 @@ func writeParquet(res *drivers.Result, path string) error { recordBuilder.Field(i).(*array.BinaryBuilder).Append(v) } } + rows++ + if rows == 1000 { + rec := recordBuilder.NewRecord() + if err := parquetwriter.WriteBuffered(rec); err != nil { + rec.Release() + return err + } + rec.Release() + if parquetwriter.RowGroupTotalBytesWritten() >= maxParquetRowGroupSize { + // Also flushes the data to the disk freeing memory + parquetwriter.NewBufferedRowGroup() + } + rows = 0 + } } if res.Err() != nil { return res.Err() } - - f, err := os.Create(path) - if err != nil { - return err + if rows == 0 { + return nil } - defer f.Close() + rec := recordBuilder.NewRecord() + err = parquetwriter.Write(rec) + // release the record before returning the error + rec.Release() + return err +} - parquetwriter, err := pqarrow.NewFileWriter(schema, f, nil, pqarrow.ArrowWriterProperties{}) - if err != nil { - return err - } - defer parquetwriter.Close() +// A limitedWriter writes to W but limits the amount of +// data written to just N bytes. +// +// Modified from github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/util/ioutils/ioutils.go +type limitedWriter struct { + W io.Writer // underlying writer + N int64 // max bytes remaining +} - return parquetwriter.Write(recordBuilder.NewRecord()) +func (l *limitedWriter) Write(p []byte) (n int, err error) { + if l.N <= 0 { + return 0, io.ErrShortWrite + } + if int64(len(p)) > l.N { + return 0, io.ErrShortWrite + } + n, err = l.W.Write(p) + l.N -= int64(n) + return } diff --git a/runtime/drivers/registry.go b/runtime/drivers/registry.go index eb6421c74d8..23e67d62179 100644 --- a/runtime/drivers/registry.go +++ b/runtime/drivers/registry.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/c2h5oh/datasize" "github.com/mitchellh/mapstructure" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" ) @@ -71,8 +72,8 @@ type Instance struct { // For example, a variable "rill.stage_changes=true" would set the StageChanges field to true. // InstanceConfig should only be used for config that the user is allowed to change dynamically at runtime. type InstanceConfig struct { - // DownloadRowLimit is the row limit for interactive data exports. If set to 0, there is no limit. - DownloadRowLimit int64 `mapstructure:"rill.download_row_limit"` + // DownloadLimitBytes is the limit on size of exported file. If set to 0, there is no limit. + DownloadLimitBytes int64 `mapstructure:"rill.download_row_limit_bytes"` // PivotCellLimit is the maximum number of cells allowed in a single pivot query. // Note that it does not limit the UI's pivot table because it paginates the requests. PivotCellLimit int64 `mapstructure:"rill.pivot_cell_limit"` @@ -125,7 +126,7 @@ func (i *Instance) ResolveVariables() map[string]string { func (i *Instance) Config() (InstanceConfig, error) { // Default config res := InstanceConfig{ - DownloadRowLimit: 200_000, + DownloadLimitBytes: int64(datasize.MB * 128), PivotCellLimit: 2_000_000, InteractiveSQLRowLimit: 10_000, StageChanges: true, diff --git a/runtime/metricsview/executor.go b/runtime/metricsview/executor.go index fd8d741fe14..2873ed18548 100644 --- a/runtime/metricsview/executor.go +++ b/runtime/metricsview/executor.go @@ -152,8 +152,7 @@ func (e *Executor) Query(ctx context.Context, qry *Query, executionTime *time.Ti return nil, false, runtime.ErrForbidden } - export := qry.Label // TODO: Always set to false once all upstream code uses Export() for exports - if err := e.rewriteQueryLimit(qry, export); err != nil { + if err := e.rewriteQueryLimit(qry); err != nil { return nil, false, err } @@ -248,7 +247,7 @@ func (e *Executor) Query(ctx context.Context, qry *Query, executionTime *time.Ti }) } - limitCap := e.queryLimitCap(export) + limitCap := e.instanceCfg.InteractiveSQLRowLimit if limitCap > 0 { res.SetCap(limitCap) } @@ -266,10 +265,6 @@ func (e *Executor) Export(ctx context.Context, qry *Query, executionTime *time.T return "", runtime.ErrForbidden } - if err := e.rewriteQueryLimit(qry, true); err != nil { - return "", err - } - pivotAST, pivoting, err := e.rewriteQueryForPivot(qry) if err != nil { return "", err diff --git a/runtime/metricsview/executor_export.go b/runtime/metricsview/executor_export.go index 5448ba16e68..8d9af85c7f7 100644 --- a/runtime/metricsview/executor_export.go +++ b/runtime/metricsview/executor_export.go @@ -19,18 +19,12 @@ func (e *Executor) executeExport(ctx context.Context, format drivers.FileFormat, ctx, cancel := context.WithTimeout(ctx, defaultExportTimeout) defer cancel() - path := e.rt.TempDir(e.instanceID, "metrics_export") - err := os.MkdirAll(path, os.ModePerm) - if err != nil { - return "", err - } - name, err := randomString("export-", 16) if err != nil { return "", err } name = format.Filename(name) - path = filepath.Join(path, name) + path := filepath.Join(e.rt.TempDir(e.instanceID), name) ic, ir, err := e.rt.AcquireHandle(ctx, e.instanceID, inputConnector) if err != nil { @@ -59,8 +53,9 @@ func (e *Executor) executeExport(ctx context.Context, format drivers.FileFormat, OutputHandle: oc, OutputConnector: "file", OutputProperties: map[string]any{ - "path": path, - "format": format, + "path": path, + "format": format, + "file_size_limit_bytes": e.instanceCfg.DownloadLimitBytes, }, Priority: e.priority, } @@ -75,6 +70,7 @@ func (e *Executor) executeExport(ctx context.Context, format drivers.FileFormat, _, err = me.Execute(ctx) if err != nil { + _ = os.Remove(path) return "", fmt.Errorf("failed to execute export: %w", err) } diff --git a/runtime/metricsview/executor_rewrite_limit.go b/runtime/metricsview/executor_rewrite_limit.go index 19877b252fe..8c69fc7905c 100644 --- a/runtime/metricsview/executor_rewrite_limit.go +++ b/runtime/metricsview/executor_rewrite_limit.go @@ -4,8 +4,8 @@ import "fmt" // rewriteQueryLimit rewrites the query limit to enforce system limits. // For unlimited queries, it adds a limit just above the system limit. The result reader should then error if the cap is exceeded. -func (e *Executor) rewriteQueryLimit(qry *Query, export bool) error { - limitCap := e.queryLimitCap(export) +func (e *Executor) rewriteQueryLimit(qry *Query) error { + limitCap := e.instanceCfg.InteractiveSQLRowLimit // No magic if there is no cap if limitCap == 0 { @@ -26,11 +26,3 @@ func (e *Executor) rewriteQueryLimit(qry *Query, export bool) error { return nil } - -// queryLimitCap returns the system limit for the given query type. -func (e *Executor) queryLimitCap(export bool) int64 { - if export { - return e.instanceCfg.DownloadRowLimit - } - return e.instanceCfg.InteractiveSQLRowLimit -} diff --git a/runtime/server/downloads.go b/runtime/server/downloads.go index 7d90209eddd..73255f861e8 100644 --- a/runtime/server/downloads.go +++ b/runtime/server/downloads.go @@ -15,7 +15,6 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" - "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/queries" "github.com/rilldata/rill/runtime/server/auth" "google.golang.org/grpc/codes" @@ -66,17 +65,6 @@ func (s *Server) Export(ctx context.Context, req *runtimev1.ExportRequest) (*run return nil, ErrForbidden } - cfg, err := s.runtime.InstanceConfig(ctx, req.InstanceId) - if err != nil { - return nil, status.Error(codes.FailedPrecondition, err.Error()) - } - - if cfg.DownloadRowLimit != 0 { - if req.Limit > cfg.DownloadRowLimit { - return nil, status.Errorf(codes.InvalidArgument, "limit must be less than or equal to %d", cfg.DownloadRowLimit) - } - } - if req.BakedQuery != "" { qry, err := UnbakeQuery(req.BakedQuery) if err != nil { @@ -107,19 +95,13 @@ func (s *Server) downloadHandler(w http.ResponseWriter, req *http.Request) { return } - cfg, err := s.runtime.InstanceConfig(req.Context(), request.InstanceId) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - var q runtime.Query switch v := request.Query.Query.(type) { case *runtimev1.Query_MetricsViewAggregationRequest: r := v.MetricsViewAggregationRequest var limitPtr *int64 - limit := s.resolveExportLimit(cfg, request.Limit, r.Limit) + limit := s.resolveExportLimit(request.Limit, r.Limit) if limit != 0 { limitPtr = &limit } @@ -154,7 +136,7 @@ func (s *Server) downloadHandler(w http.ResponseWriter, req *http.Request) { r := v.MetricsViewToplistRequest var limitPtr *int64 - limit := s.resolveExportLimit(cfg, request.Limit, r.Limit) + limit := s.resolveExportLimit(request.Limit, r.Limit) if limit != 0 { limitPtr = &limit } @@ -183,7 +165,7 @@ func (s *Server) downloadHandler(w http.ResponseWriter, req *http.Request) { } var limitPtr *int64 - limit := s.resolveExportLimit(cfg, request.Limit, int64(r.Limit)) + limit := s.resolveExportLimit(request.Limit, int64(r.Limit)) if limit != 0 { limitPtr = &limit } @@ -222,7 +204,7 @@ func (s *Server) downloadHandler(w http.ResponseWriter, req *http.Request) { ComparisonMeasures: r.ComparisonMeasures, TimeRange: r.TimeRange, ComparisonTimeRange: r.ComparisonTimeRange, - Limit: s.resolveExportLimit(cfg, request.Limit, r.Limit), + Limit: s.resolveExportLimit(request.Limit, r.Limit), Offset: r.Offset, Sort: r.Sort, Filter: r.Filter, @@ -278,16 +260,11 @@ func (s *Server) downloadHandler(w http.ResponseWriter, req *http.Request) { } } -func (s *Server) resolveExportLimit(cfg drivers.InstanceConfig, base, override int64) int64 { +func (s *Server) resolveExportLimit(base, override int64) int64 { res := base if override < res { res = override } - if cfg.DownloadRowLimit != 0 { - if res == 0 || res > cfg.DownloadRowLimit { - res = cfg.DownloadRowLimit - } - } return res } diff --git a/runtime/testruntime/testruntime.go b/runtime/testruntime/testruntime.go index 89fcc22e244..ca95ae68ec0 100644 --- a/runtime/testruntime/testruntime.go +++ b/runtime/testruntime/testruntime.go @@ -61,6 +61,7 @@ func New(t TestingT) *runtime.Runtime { ControllerLogBufferCapacity: 10000, ControllerLogBufferSizeBytes: int64(datasize.MB * 16), AllowHostAccess: true, + DataDir: t.TempDir(), } logger := zap.NewNop() From ee63acf497c1956a0d8a997a53f219575b70f930 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Wed, 3 Jul 2024 13:54:01 +0200 Subject: [PATCH 3/5] Snowflake driver: Reduce chatty debug logs (#5179) --- runtime/drivers/snowflake/sql_store.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/runtime/drivers/snowflake/sql_store.go b/runtime/drivers/snowflake/sql_store.go index 93387e90489..1a34a277c95 100644 --- a/runtime/drivers/snowflake/sql_store.go +++ b/runtime/drivers/snowflake/sql_store.go @@ -207,26 +207,27 @@ func (f *fileIterator) Next() ([]string, error) { // mutex to protect file writes var mu sync.Mutex batchesLeft := len(f.batches) + start := time.Now() for _, batch := range f.batches { b := batch errGrp.Go(func() error { - fetchStart := time.Now() records, err := b.Fetch() if err != nil { return err } - f.logger.Debug( - "fetched an arrow batch", - zap.Duration("duration", time.Since(fetchStart)), - zap.Int("row_count", b.GetRowCount()), - ) mu.Lock() defer mu.Unlock() - writeStart := time.Now() + for _, rec := range *records { if writer.RowGroupTotalBytesWritten() >= rowGroupBufferSize { writer.NewBufferedRowGroup() + f.logger.Debug( + "starting writing to new parquet row group", + zap.Float64("progress", float64(len(f.batches)-batchesLeft)/float64(len(f.batches))*100), + zap.Int("total_records", int(f.totalRecords)), + zap.Duration("elapsed", time.Since(start)), + ) } if err := writer.WriteBuffered(rec); err != nil { return err @@ -239,12 +240,6 @@ func (f *fileIterator) Next() ([]string, error) { } } batchesLeft-- - f.logger.Debug( - "wrote an arrow batch to a parquet file", - zap.Float64("progress", float64(len(f.batches)-batchesLeft)/float64(len(f.batches))*100), - zap.Int("row_count", b.GetRowCount()), - zap.Duration("write_duration", time.Since(writeStart)), - ) f.totalRecords += int64(b.GetRowCount()) return nil }) From 243f1065c2523cbe84eb8d9aa53d8b42543598ef Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Wed, 3 Jul 2024 17:24:59 +0530 Subject: [PATCH 4/5] Remove pivot cell limit (#5182) --- runtime/drivers/registry.go | 4 --- runtime/metricsview/executor_pivot.go | 50 +++------------------------ 2 files changed, 4 insertions(+), 50 deletions(-) diff --git a/runtime/drivers/registry.go b/runtime/drivers/registry.go index 23e67d62179..70a680f9264 100644 --- a/runtime/drivers/registry.go +++ b/runtime/drivers/registry.go @@ -74,9 +74,6 @@ type Instance struct { type InstanceConfig struct { // DownloadLimitBytes is the limit on size of exported file. If set to 0, there is no limit. DownloadLimitBytes int64 `mapstructure:"rill.download_row_limit_bytes"` - // PivotCellLimit is the maximum number of cells allowed in a single pivot query. - // Note that it does not limit the UI's pivot table because it paginates the requests. - PivotCellLimit int64 `mapstructure:"rill.pivot_cell_limit"` // InteractiveSQLRowLimit is the row limit for interactive SQL queries. It does not apply to exports of SQL queries. If set to 0, there is no limit. InteractiveSQLRowLimit int64 `mapstructure:"rill.interactive_sql_row_limit"` // StageChanges indicates whether to keep previously ingested tables for sources/models, and only override them if ingestion of a new table is successful. @@ -127,7 +124,6 @@ func (i *Instance) Config() (InstanceConfig, error) { // Default config res := InstanceConfig{ DownloadLimitBytes: int64(datasize.MB * 128), - PivotCellLimit: 2_000_000, InteractiveSQLRowLimit: 10_000, StageChanges: true, ModelDefaultMaterialize: false, diff --git a/runtime/metricsview/executor_pivot.go b/runtime/metricsview/executor_pivot.go index 054c43160e3..2ae4b6d0731 100644 --- a/runtime/metricsview/executor_pivot.go +++ b/runtime/metricsview/executor_pivot.go @@ -100,17 +100,6 @@ func (e *Executor) rewriteQueryForPivot(qry *Query) (*pivotAST, bool, error) { qry.Offset = nil qry.Label = false - // If we have a cell limit, apply a row limit just above it to the underlying query. - // This prevents the DB from scanning too much data before we can detect that the query will exceed the cell limit. - if e.instanceCfg.PivotCellLimit != 0 { - cols := int64(len(qry.Dimensions) + len(qry.Measures)) - ast.underlyingCellCap = e.instanceCfg.PivotCellLimit - ast.underlyingRowCap = e.instanceCfg.PivotCellLimit / cols - - tmp := ast.underlyingRowCap + 1 - qry.Limit = &tmp - } - return ast, true, nil } @@ -185,7 +174,7 @@ func (e *Executor) executePivotExport(ctx context.Context, ast *AST, pivot *pivo }() // Build the PIVOT query - pivotSQL, err := pivot.SQL(ast, alias, true) + pivotSQL, err := pivot.SQL(ast, alias) if err != nil { return err } @@ -215,50 +204,19 @@ type pivotAST struct { limit *int64 offset *int64 - label bool - dialect drivers.Dialect - underlyingCellCap int64 - underlyingRowCap int64 + label bool + dialect drivers.Dialect } // SQL generates a query that outputs a pivoted table based on the pivot config and data in the underlying query. // The underlyingAlias must be an alias for a table that holds the data produced by underlyingAST.SQL(). -func (a *pivotAST) SQL(underlyingAST *AST, underlyingAlias string, checkCap bool) (string, error) { +func (a *pivotAST) SQL(underlyingAST *AST, underlyingAlias string) (string, error) { if !a.dialect.CanPivot() { return "", fmt.Errorf("pivot queries not supported for dialect %q", a.dialect.String()) } b := &strings.Builder{} - // Circumstances make it easiest to enforce the pivot cell limit in the PIVOT query itself. To do that, we need to be creative. - // We leverage CTEs and DuckDB's ERROR() function to enforce the limit. - // This is pretty DuckDB-specific, but that's also currently the only OLAP we use that supports pivoting. - // The query looks something like: - // - // WITH t AS (SELECT * FROM WHERE IF(EXISTS (SELECT COUNT(*) AS count FROM HAVING count > ), ERROR('pivot query exceeds limit'), TRUE)) - // PIVOT t2 ON ... - // - if checkCap { - tmpAlias, err := randomString("t", 8) - if err != nil { - return "", fmt.Errorf("failed to generate random alias: %w", err) - } - - b.WriteString("WITH ") - b.WriteString(tmpAlias) - b.WriteString(" AS (SELECT * FROM ") - b.WriteString(underlyingAlias) - b.WriteString(" WHERE IF(EXISTS (SELECT COUNT(*) AS count FROM ") - b.WriteString(underlyingAlias) - b.WriteString(" HAVING count > ") - b.WriteString(strconv.FormatInt(a.underlyingRowCap, 10)) - b.WriteString("), ERROR('pivot query exceeds limit of ") - b.WriteString(strconv.FormatInt(a.underlyingCellCap, 10)) - b.WriteString(" cells'), TRUE)) ") - - underlyingAlias = tmpAlias - } - // If we need to label some fields (in practice, this will be non-pivoted dims during exports), // we emit a query like: SELECT d1 AS "L1", d2 AS "L2", * EXCLUDE (d1, d2) FROM (PIVOT ...) wrapWithLabels := a.label && len(a.keep) > 0 From 3e9cd10d189bb79daf120940d0c94e9167617b60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Wed, 3 Jul 2024 13:55:16 +0200 Subject: [PATCH 5/5] Admin: Fix lint error in assets job (#5183) --- admin/worker/delete_unsued_assets.go | 1 + 1 file changed, 1 insertion(+) diff --git a/admin/worker/delete_unsued_assets.go b/admin/worker/delete_unsued_assets.go index 54b6374b618..88d9b7f89a9 100644 --- a/admin/worker/delete_unsued_assets.go +++ b/admin/worker/delete_unsued_assets.go @@ -31,6 +31,7 @@ func (w *Worker) deleteUnusedAssets(ctx context.Context) error { group.SetLimit(8) var ids []string for _, asset := range assets { + asset := asset ids = append(ids, asset.ID) group.Go(func() error { parsed, err := url.Parse(asset.Path)