Skip to content

Commit

Permalink
Merge branch 'begelundmuller/retrofit-mv-rpcs' into begelundmuller/si…
Browse files Browse the repository at this point in the history
…mplify-unnest
  • Loading branch information
begelundmuller committed Jun 27, 2024
2 parents 6cd748b + 54d5757 commit 36062ee
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 39 deletions.
12 changes: 6 additions & 6 deletions runtime/drivers/duckdb/model_executor_self_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,22 @@ func (e *selfToFileExecutor) Execute(ctx context.Context) (*drivers.ModelResult,
}, nil
}

func exportSQL(qry, path, format string) (string, error) {
func exportSQL(qry, path string, format drivers.FileFormat) (string, error) {
switch format {
case "parquet":
case drivers.FileFormatParquet:
return fmt.Sprintf("COPY (%s\n) TO '%s' (FORMAT PARQUET)", qry, path), nil
case "csv":
case drivers.FileFormatCSV:
return fmt.Sprintf("COPY (%s\n) TO '%s' (FORMAT CSV, HEADER true)", qry, path), nil
case "json":
case drivers.FileFormatJSON:
return fmt.Sprintf("COPY (%s\n) TO '%s' (FORMAT JSON)", qry, path), nil
default:
return "", fmt.Errorf("duckdb: unsupported export format %q", format)
}
}

func supportsExportFormat(format string) bool {
func supportsExportFormat(format drivers.FileFormat) bool {
switch format {
case "parquet", "csv", "json":
case drivers.FileFormatParquet, drivers.FileFormatCSV, drivers.FileFormatJSON:
return true
default:
return false
Expand Down
16 changes: 11 additions & 5 deletions runtime/drivers/file/model_executor.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package file

import "fmt"
import (
"fmt"

"github.com/rilldata/rill/runtime/drivers"
)

type ModelOutputProperties struct {
Path string `mapstructure:"path"`
Format string `mapstructure:"format"`
Path string `mapstructure:"path"`
Format drivers.FileFormat `mapstructure:"format"`
}

func (p *ModelOutputProperties) Validate() error {
Expand All @@ -13,11 +17,13 @@ func (p *ModelOutputProperties) Validate() error {
}
if p.Format == "" {
return fmt.Errorf("missing property 'format'")
} else if !p.Format.Valid() {
return fmt.Errorf("invalid property 'format': %q", p.Format)
}
return nil
}

type ModelResultProperties struct {
Path string `mapstructure:"path"`
Format string `mapstructure:"format"`
Path string `mapstructure:"path"`
Format drivers.FileFormat `mapstructure:"format"`
}
10 changes: 6 additions & 4 deletions runtime/drivers/file/model_executor_olap_self.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ func (e *olapToSelfExecutor) Execute(ctx context.Context) (*drivers.ModelResult,
defer res.Close()

switch outputProps.Format {
case "csv":
case drivers.FileFormatParquet:
err = writeParquet(res, outputProps.Path)
case drivers.FileFormatCSV:
err = writeCSV(res, outputProps.Path)
case "xlsx":
case drivers.FileFormatJSON:
return nil, errors.New("json file output not currently supported")
case drivers.FileFormatXLSX:
err = writeXLSX(res, outputProps.Path)
case "parquet":
err = writeParquet(res, outputProps.Path)
default:
return nil, fmt.Errorf("unsupported output format %q", outputProps.Format)
}
Expand Down
22 changes: 22 additions & 0 deletions runtime/drivers/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,25 @@ type ModelExecutorOptions struct {
IncrementalRun bool
PreviousResult *ModelResult
}

type FileFormat string

const (
FileFormatUnspecified FileFormat = ""
FileFormatParquet FileFormat = "parquet"
FileFormatCSV FileFormat = "csv"
FileFormatJSON FileFormat = "json"
FileFormatXLSX FileFormat = "xlsx"
)

func (f FileFormat) Filename(stem string) string {
return stem + "." + string(f)
}

func (f FileFormat) Valid() bool {
switch f {
case FileFormatParquet, FileFormatCSV, FileFormatJSON, FileFormatXLSX:
return true
}
return false
}
2 changes: 1 addition & 1 deletion runtime/metricsview/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (e *Executor) Query(ctx context.Context, qry *Query, executionTime *time.Ti

// Export executes and exports the provided query against the metrics view.
// It returns a path to a temporary file containing the export. The caller is responsible for cleaning up the file.
func (e *Executor) Export(ctx context.Context, qry *Query, executionTime *time.Time, format string) (string, error) {
func (e *Executor) Export(ctx context.Context, qry *Query, executionTime *time.Time, format drivers.FileFormat) (string, error) {
if e.security != nil && !e.security.Access {
return "", runtime.ErrForbidden
}
Expand Down
4 changes: 2 additions & 2 deletions runtime/metricsview/executor_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// executeExport works by simulating a model that outputs to a file.
// This means it creates a ModelExecutor with the provided input connector and props as input,
// and with the "file" driver as the output connector targeting a temporary output path.
func (e *Executor) executeExport(ctx context.Context, format, inputConnector string, inputProps map[string]any) (string, error) {
func (e *Executor) executeExport(ctx context.Context, format drivers.FileFormat, inputConnector string, inputProps map[string]any) (string, error) {
ctx, cancel := context.WithTimeout(ctx, defaultExportTimeout)
defer cancel()

Expand All @@ -29,7 +29,7 @@ func (e *Executor) executeExport(ctx context.Context, format, inputConnector str
if err != nil {
return "", err
}
name = fmt.Sprintf("%s.%s", name, format)
name = format.Filename(name)
path = filepath.Join(path, name)

ic, ir, err := e.rt.AcquireHandle(ctx, e.instanceID, inputConnector)
Expand Down
2 changes: 1 addition & 1 deletion runtime/metricsview/executor_pivot.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (e *Executor) rewriteQueryForPivot(qry *Query) (*pivotAST, bool, error) {
}

// executePivotExport executes a PIVOT query prepared using rewriteQueryForPivot, and exports the result to a file in the given format.
func (e *Executor) executePivotExport(ctx context.Context, ast *AST, pivot *pivotAST, format string) (string, error) {
func (e *Executor) executePivotExport(ctx context.Context, ast *AST, pivot *pivotAST, format drivers.FileFormat) (string, error) {
ctx, cancel := context.WithTimeout(ctx, defaultPivotExportTimeout)
defer cancel()

Expand Down
13 changes: 7 additions & 6 deletions runtime/queries/metricsview_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/metricsview"
)

Expand All @@ -33,7 +34,7 @@ type MetricsViewAggregation struct {
Exact bool `json:"exact,omitempty"`

Result *runtimev1.MetricsViewAggregationResponse `json:"-"`
Exporting bool `json:"-"`
Exporting bool `json:"-"` // Deprecated: Remove when tests call Export directly
}

var _ runtime.Query = &MetricsViewAggregation{}
Expand Down Expand Up @@ -127,14 +128,14 @@ func (q *MetricsViewAggregation) Export(ctx context.Context, rt *runtime.Runtime
}
defer e.Close()

var format string
var format drivers.FileFormat
switch opts.Format {
case runtimev1.ExportFormat_EXPORT_FORMAT_CSV:
format = "csv"
format = drivers.FileFormatCSV
case runtimev1.ExportFormat_EXPORT_FORMAT_XLSX:
format = "xlsx"
format = drivers.FileFormatXLSX
case runtimev1.ExportFormat_EXPORT_FORMAT_PARQUET:
format = "parquet"
format = drivers.FileFormatParquet
default:
return fmt.Errorf("unsupported format: %s", opts.Format.String())
}
Expand Down Expand Up @@ -263,7 +264,7 @@ func (q *MetricsViewAggregation) rewriteToMetricsViewQuery(export bool) (*metric
qry.ComparisonTimeRange = res
}

if q.Filter != nil { // backwards backwards compatibility
if q.Filter != nil { // Backwards compatibility
if q.Where != nil {
return nil, fmt.Errorf("both filter and where is provided")
}
Expand Down
13 changes: 6 additions & 7 deletions runtime/queries/metricsview_comparison_toplist.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/metricsview"
"github.com/rilldata/rill/runtime/pkg/pbutil"

Expand Down Expand Up @@ -89,7 +90,6 @@ func (q *MetricsViewComparison) Resolve(ctx context.Context, rt *runtime.Runtime
return err
}

// Attempt to route to metricsview executor
qry, err := q.rewriteToMetricsViewQuery(false)
if err != nil {
return fmt.Errorf("error rewriting to metrics query: %w", err)
Expand Down Expand Up @@ -181,7 +181,6 @@ func (q *MetricsViewComparison) Export(ctx context.Context, rt *runtime.Runtime,
return err
}

// Attempt to route to metricsview executor
qry, err := q.rewriteToMetricsViewQuery(true)
if err != nil {
return fmt.Errorf("error rewriting to metrics query: %w", err)
Expand All @@ -193,14 +192,14 @@ func (q *MetricsViewComparison) Export(ctx context.Context, rt *runtime.Runtime,
}
defer e.Close()

var format string
var format drivers.FileFormat
switch opts.Format {
case runtimev1.ExportFormat_EXPORT_FORMAT_CSV:
format = "csv"
format = drivers.FileFormatCSV
case runtimev1.ExportFormat_EXPORT_FORMAT_XLSX:
format = "xlsx"
format = drivers.FileFormatXLSX
case runtimev1.ExportFormat_EXPORT_FORMAT_PARQUET:
format = "parquet"
format = drivers.FileFormatParquet
default:
return fmt.Errorf("unsupported format: %s", opts.Format.String())
}
Expand Down Expand Up @@ -354,7 +353,7 @@ func (q *MetricsViewComparison) rewriteToMetricsViewQuery(export bool) (*metrics
})
}

if q.Filter != nil { // backwards backwards compatibility
if q.Filter != nil { // Backwards compatibility
if q.Where != nil {
return nil, fmt.Errorf("both filter and where is provided")
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/queries/metricsview_timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (q *MetricsViewTimeSeries) rewriteToMetricsViewQuery(timeDimension string)
})
}

if q.Filter != nil { // backwards backwards compatibility
if q.Filter != nil { // Backwards compatibility
if q.Where != nil {
return nil, fmt.Errorf("both filter and where is provided")
}
Expand Down
11 changes: 6 additions & 5 deletions runtime/queries/metricsview_toplist.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/metricsview"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -128,14 +129,14 @@ func (q *MetricsViewToplist) Export(ctx context.Context, rt *runtime.Runtime, in
}
defer e.Close()

var format string
var format drivers.FileFormat
switch opts.Format {
case runtimev1.ExportFormat_EXPORT_FORMAT_CSV:
format = "csv"
format = drivers.FileFormatCSV
case runtimev1.ExportFormat_EXPORT_FORMAT_XLSX:
format = "xlsx"
format = drivers.FileFormatXLSX
case runtimev1.ExportFormat_EXPORT_FORMAT_PARQUET:
format = "parquet"
format = drivers.FileFormatParquet
default:
return fmt.Errorf("unsupported format: %s", opts.Format.String())
}
Expand Down Expand Up @@ -201,7 +202,7 @@ func (q *MetricsViewToplist) rewriteToMetricsViewQuery(export bool) (*metricsvie
})
}

if q.Filter != nil { // backwards backwards compatibility
if q.Filter != nil { // Backwards compatibility
if q.Where != nil {
return nil, fmt.Errorf("both filter and where is provided")
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/queries/metricsview_totals.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (q *MetricsViewTotals) rewriteToMetricsViewQuery(exporting bool) (*metricsv
qry.TimeRange = res
}

if q.Filter != nil { // backwards backwards compatibility
if q.Filter != nil { // Backwards compatibility
if q.Where != nil {
return nil, fmt.Errorf("both filter and where is provided")
}
Expand Down

0 comments on commit 36062ee

Please sign in to comment.