Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Commit

Permalink
sql: implement EXPLODE and generators (#720)
Browse files Browse the repository at this point in the history
sql: implement EXPLODE and generators
  • Loading branch information
ajnavarro committed Jun 3, 2019
2 parents 4dcaf78 + 728f747 commit ab5656f
Show file tree
Hide file tree
Showing 15 changed files with 1,010 additions and 66 deletions.
60 changes: 60 additions & 0 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2459,6 +2459,66 @@ func TestDescribeNoPruneColumns(t *testing.T) {
require.Len(p.Schema(), 3)
}

var generatorQueries = []struct {
query string
expected []sql.Row
}{
{
`SELECT a, EXPLODE(b), c FROM t`,
[]sql.Row{
{int64(1), "a", "first"},
{int64(1), "b", "first"},
{int64(2), "c", "second"},
{int64(2), "d", "second"},
{int64(3), "e", "third"},
{int64(3), "f", "third"},
},
},
{
`SELECT a, EXPLODE(b) AS x, c FROM t`,
[]sql.Row{
{int64(1), "a", "first"},
{int64(1), "b", "first"},
{int64(2), "c", "second"},
{int64(2), "d", "second"},
{int64(3), "e", "third"},
{int64(3), "f", "third"},
},
},
{
`SELECT a, EXPLODE(b) AS x, c FROM t WHERE x = 'e'`,
[]sql.Row{
{int64(3), "e", "third"},
},
},
}

func TestGenerators(t *testing.T) {
table := mem.NewPartitionedTable("t", sql.Schema{
{Name: "a", Type: sql.Int64, Source: "t"},
{Name: "b", Type: sql.Array(sql.Text), Source: "t"},
{Name: "c", Type: sql.Text, Source: "t"},
}, testNumPartitions)

insertRows(
t, table,
sql.NewRow(int64(1), []interface{}{"a", "b"}, "first"),
sql.NewRow(int64(2), []interface{}{"c", "d"}, "second"),
sql.NewRow(int64(3), []interface{}{"e", "f"}, "third"),
)

db := mem.NewDatabase("db")
db.AddTable("t", table)

catalog := sql.NewCatalog()
catalog.AddDatabase(db)
e := sqle.New(catalog, analyzer.NewDefault(catalog), new(sqle.Config))

for _, q := range generatorQueries {
testQuery(t, e, q.query, q.expected)
}
}

func insertRows(t *testing.T, table sql.Inserter, rows ...sql.Row) {
t.Helper()

Expand Down
17 changes: 13 additions & 4 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,12 @@ func (h *Handler) ComQuery(
return err
}

r.Rows = append(r.Rows, rowToSQL(schema, row))
outputRow, err := rowToSQL(schema, row)
if err != nil {
return err
}

r.Rows = append(r.Rows, outputRow)
r.RowsAffected++
}

Expand Down Expand Up @@ -203,13 +208,17 @@ func (h *Handler) handleKill(conn *mysql.Conn, query string) (bool, error) {
return true, nil
}

func rowToSQL(s sql.Schema, row sql.Row) []sqltypes.Value {
func rowToSQL(s sql.Schema, row sql.Row) ([]sqltypes.Value, error) {
o := make([]sqltypes.Value, len(row))
var err error
for i, v := range row {
o[i] = s[i].Type.SQL(v)
o[i], err = s[i].Type.SQL(v)
if err != nil {
return nil, err
}
}

return o
return o, nil
}

func schemaToFields(s sql.Schema) []*query.Field {
Expand Down
97 changes: 97 additions & 0 deletions sql/analyzer/resolve_generators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package analyzer

import (
"gopkg.in/src-d/go-errors.v1"
"github.com/src-d/go-mysql-server/sql"
"github.com/src-d/go-mysql-server/sql/expression"
"github.com/src-d/go-mysql-server/sql/expression/function"
"github.com/src-d/go-mysql-server/sql/plan"
)

var (
errMultipleGenerators = errors.NewKind("there can't be more than 1 instance of EXPLODE in a SELECT")
errExplodeNotArray = errors.NewKind("argument of type %q given to EXPLODE, expecting array")
)

func resolveGenerators(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
return n.TransformUp(func(n sql.Node) (sql.Node, error) {
p, ok := n.(*plan.Project)
if !ok {
return n, nil
}

projection := p.Projections

g, err := findGenerator(projection)
if err != nil {
return nil, err
}

// There might be no generator in the project, in that case we don't
// have to do anything.
if g == nil {
return n, nil
}

projection[g.idx] = g.expr

var name string
if n, ok := g.expr.(sql.Nameable); ok {
name = n.Name()
} else {
name = g.expr.String()
}

return plan.NewGenerate(
plan.NewProject(projection, p.Child),
expression.NewGetField(g.idx, g.expr.Type(), name, g.expr.IsNullable()),
), nil
})
}

type generator struct {
idx int
expr sql.Expression
}

// findGenerator will find in the given projection a generator column. If there
// is no generator, it will return nil.
// If there are is than one generator or the argument to explode is not an
// array it will fail.
// All occurrences of Explode will be replaced with Generate.
func findGenerator(exprs []sql.Expression) (*generator, error) {
var g = &generator{idx: -1}
for i, e := range exprs {
var found bool
switch e := e.(type) {
case *function.Explode:
found = true
g.expr = function.NewGenerate(e.Child)
case *expression.Alias:
if exp, ok := e.Child.(*function.Explode); ok {
found = true
g.expr = expression.NewAlias(
function.NewGenerate(exp.Child),
e.Name(),
)
}
}

if found {
if g.idx >= 0 {
return nil, errMultipleGenerators.New()
}
g.idx = i

if !sql.IsArray(g.expr.Type()) {
return nil, errExplodeNotArray.New(g.expr.Type())
}
}
}

if g.expr == nil {
return nil, nil
}

return g, nil
}
117 changes: 117 additions & 0 deletions sql/analyzer/resolve_generators_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package analyzer

import (
"testing"

"github.com/stretchr/testify/require"
"gopkg.in/src-d/go-errors.v1"
"github.com/src-d/go-mysql-server/sql"
"github.com/src-d/go-mysql-server/sql/expression"
"github.com/src-d/go-mysql-server/sql/expression/function"
"github.com/src-d/go-mysql-server/sql/plan"
)

func TestResolveGenerators(t *testing.T) {
testCases := []struct {
name string
node sql.Node
expected sql.Node
err *errors.Kind
}{
{
name: "regular explode",
node: plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
function.NewExplode(expression.NewGetField(1, sql.Array(sql.Int64), "b", false)),
expression.NewGetField(2, sql.Int64, "c", false),
},
plan.NewUnresolvedTable("foo", ""),
),
expected: plan.NewGenerate(
plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
function.NewGenerate(expression.NewGetField(1, sql.Array(sql.Int64), "b", false)),
expression.NewGetField(2, sql.Int64, "c", false),
},
plan.NewUnresolvedTable("foo", ""),
),
expression.NewGetField(1, sql.Array(sql.Int64), "EXPLODE(b)", false),
),
err: nil,
},
{
name: "explode with alias",
node: plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
expression.NewAlias(
function.NewExplode(
expression.NewGetField(1, sql.Array(sql.Int64), "b", false),
),
"x",
),
expression.NewGetField(2, sql.Int64, "c", false),
},
plan.NewUnresolvedTable("foo", ""),
),
expected: plan.NewGenerate(
plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
expression.NewAlias(
function.NewGenerate(
expression.NewGetField(1, sql.Array(sql.Int64), "b", false),
),
"x",
),
expression.NewGetField(2, sql.Int64, "c", false),
},
plan.NewUnresolvedTable("foo", ""),
),
expression.NewGetField(1, sql.Array(sql.Int64), "x", false),
),
err: nil,
},
{
name: "non array type on explode",
node: plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
function.NewExplode(expression.NewGetField(1, sql.Int64, "b", false)),
},
plan.NewUnresolvedTable("foo", ""),
),
expected: nil,
err: errExplodeNotArray,
},
{
name: "more than one generator",
node: plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
function.NewExplode(expression.NewGetField(1, sql.Array(sql.Int64), "b", false)),
function.NewExplode(expression.NewGetField(2, sql.Array(sql.Int64), "c", false)),
},
plan.NewUnresolvedTable("foo", ""),
),
expected: nil,
err: errMultipleGenerators,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
result, err := resolveGenerators(sql.NewEmptyContext(), nil, tt.node)
if tt.err != nil {
require.Error(err)
require.True(tt.err.Is(err))
} else {
require.NoError(err)
require.Equal(tt.expected, result)
}
})
}
}
1 change: 1 addition & 0 deletions sql/analyzer/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var OnceBeforeDefault = []Rule{
// OnceAfterDefault contains the rules to be applied just once after the
// DefaultRules.
var OnceAfterDefault = []Rule{
{"resolve_generators", resolveGenerators},
{"remove_unnecessary_converts", removeUnnecessaryConverts},
{"assign_catalog", assignCatalog},
{"prune_columns", pruneColumns},
Expand Down
32 changes: 32 additions & 0 deletions sql/analyzer/validation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
validateIndexCreationRule = "validate_index_creation"
validateCaseResultTypesRule = "validate_case_result_types"
validateIntervalUsageRule = "validate_interval_usage"
validateExplodeUsageRule = "validate_explode_usage"
)

var (
Expand Down Expand Up @@ -51,6 +52,11 @@ var (
"invalid use of an interval, which can only be used with DATE_ADD, " +
"DATE_SUB and +/- operators to subtract from or add to a date",
)
// ErrExplodeInvalidUse is returned when an EXPLODE function is used
// outside a Project node.
ErrExplodeInvalidUse = errors.NewKind(
"using EXPLODE is not supported outside a Project node",
)
)

// DefaultValidationRules to apply while analyzing nodes.
Expand All @@ -63,6 +69,7 @@ var DefaultValidationRules = []Rule{
{validateIndexCreationRule, validateIndexCreation},
{validateCaseResultTypesRule, validateCaseResultTypes},
{validateIntervalUsageRule, validateIntervalUsage},
{validateExplodeUsageRule, validateExplodeUsage},
}

func validateIsResolved(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
Expand Down Expand Up @@ -290,6 +297,31 @@ func validateIntervalUsage(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node,
return n, nil
}

func validateExplodeUsage(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
var invalid bool
plan.InspectExpressions(n, func(e sql.Expression) bool {
// If it's already invalid just skip everything else.
if invalid {
return false
}

// All usage of Explode will be incorrect because the ones in projects
// would have already been converted to Generate, so we only have to
// look for those.
if _, ok := e.(*function.Explode); ok {
invalid = true
}

return true
})

if invalid {
return nil, ErrExplodeInvalidUse.New()
}

return n, nil
}

func stringContains(strs []string, target string) bool {
for _, s := range strs {
if s == target {
Expand Down
Loading

0 comments on commit ab5656f

Please sign in to comment.