Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Commit

Permalink
sql: implement EXPLODE and generators
Browse files Browse the repository at this point in the history
This pull request implements the EXPLODE expressions and generators,
which are used together to provide generation of rows based on a
column or expression that can yield multiple values.

For this, there have been some quite core changes:
- SQL method of sql.Type now returns the sqltypes.Value and an error.
  This avoids panics in this method. Since generators can yield errors
  it's better to just return an error and not just kill the server.
- Array type can now internally handle either arrays or generators,
  making it transparent for the user. If it's used in an EXPLODE
  expression, the generator will be used. Otherwise, it will be
  converted automatically to an array and used without the user even
  knowing what happened behind the scenes. That way, we don't need
  yet another type to represent generators.

Aside from that, there are some new additions:
- New EXPLODE function, which is just a placeholder. EXPLODE type is the
  underlying type of its argument for resolution purposes. During analysis
  Explode nodes will be replaced by Generate functions and a Generate node.
- New Generate function, which is the non-placeholder version of EXPLODE
  and the one that goes into the final execution tree. This one returns
  the same type as its argument and let's the Generate plan node be the
  one that returns the underlying type once the values are generated.
- Generate node, which wraps a Project in which there is one and only one
  explode expression.
- resolve_generators analysis rule, which will turn projects with an
  Explode expression into a Generate node with the project as a children,
  replacing the Explode expressions with Generate expressions.
- validate_explode_usage validation rule, which will ensure explode is
  not used outside a Project node.

Signed-off-by: Miguel Molina <miguel@erizocosmi.co>
  • Loading branch information
erizocosmico committed May 22, 2019
1 parent 3f7b3fe commit 814b219
Show file tree
Hide file tree
Showing 15 changed files with 1,012 additions and 68 deletions.
64 changes: 62 additions & 2 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,8 +1076,8 @@ var queries = []struct {
{
`SELECT ARRAY_LENGTH(JSON_EXTRACT('[{"i":0}, {"i":1, "y":"yyy"}, {"i":2, "x":"xxx"}]', '$.i'))`,
[]sql.Row{{int32(3)}},
},
{
},
{
`SELECT GREATEST(1, 2, 3, 4)`,
[]sql.Row{{int64(4)}},
},
Expand Down Expand Up @@ -2346,6 +2346,66 @@ func TestDescribeNoPruneColumns(t *testing.T) {
require.Len(p.Schema(), 3)
}

var generatorQueries = []struct {
query string
expected []sql.Row
}{
{
`SELECT a, EXPLODE(b), c FROM t`,
[]sql.Row{
{int64(1), "a", "first"},
{int64(1), "b", "first"},
{int64(2), "c", "second"},
{int64(2), "d", "second"},
{int64(3), "e", "third"},
{int64(3), "f", "third"},
},
},
{
`SELECT a, EXPLODE(b) AS x, c FROM t`,
[]sql.Row{
{int64(1), "a", "first"},
{int64(1), "b", "first"},
{int64(2), "c", "second"},
{int64(2), "d", "second"},
{int64(3), "e", "third"},
{int64(3), "f", "third"},
},
},
{
`SELECT a, EXPLODE(b) AS x, c FROM t WHERE x = 'e'`,
[]sql.Row{
{int64(3), "e", "third"},
},
},
}

func TestGenerators(t *testing.T) {
table := mem.NewPartitionedTable("t", sql.Schema{
{Name: "a", Type: sql.Int64, Source: "t"},
{Name: "b", Type: sql.Array(sql.Text), Source: "t"},
{Name: "c", Type: sql.Text, Source: "t"},
}, testNumPartitions)

insertRows(
t, table,
sql.NewRow(int64(1), []interface{}{"a", "b"}, "first"),
sql.NewRow(int64(2), []interface{}{"c", "d"}, "second"),
sql.NewRow(int64(3), []interface{}{"e", "f"}, "third"),
)

db := mem.NewDatabase("db")
db.AddTable("t", table)

catalog := sql.NewCatalog()
catalog.AddDatabase(db)
e := sqle.New(catalog, analyzer.NewDefault(catalog), new(sqle.Config))

for _, q := range generatorQueries {
testQuery(t, e, q.query, q.expected)
}
}

func insertRows(t *testing.T, table sql.Inserter, rows ...sql.Row) {
t.Helper()

Expand Down
17 changes: 13 additions & 4 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,12 @@ func (h *Handler) ComQuery(
return err
}

r.Rows = append(r.Rows, rowToSQL(schema, row))
outputRow, err := rowToSQL(schema, row)
if err != nil {
return err
}

r.Rows = append(r.Rows, outputRow)
r.RowsAffected++
}

Expand Down Expand Up @@ -208,13 +213,17 @@ func (h *Handler) handleKill(conn *mysql.Conn, query string) (bool, error) {
return true, nil
}

func rowToSQL(s sql.Schema, row sql.Row) []sqltypes.Value {
func rowToSQL(s sql.Schema, row sql.Row) ([]sqltypes.Value, error) {
o := make([]sqltypes.Value, len(row))
var err error
for i, v := range row {
o[i] = s[i].Type.SQL(v)
o[i], err = s[i].Type.SQL(v)
if err != nil {
return nil, err
}
}

return o
return o, nil
}

func schemaToFields(s sql.Schema) []*query.Field {
Expand Down
97 changes: 97 additions & 0 deletions sql/analyzer/resolve_generators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package analyzer

import (
"gopkg.in/src-d/go-errors.v1"
"gopkg.in/src-d/go-mysql-server.v0/sql"
"gopkg.in/src-d/go-mysql-server.v0/sql/expression"
"gopkg.in/src-d/go-mysql-server.v0/sql/expression/function"
"gopkg.in/src-d/go-mysql-server.v0/sql/plan"
)

var (
errMultipleGenerators = errors.NewKind("there can't be more than 1 instance of EXPLODE in a SELECT")
errExplodeNotArray = errors.NewKind("argument of type %q given to EXPLODE, expecting array")
)

func resolveGenerators(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
return n.TransformUp(func(n sql.Node) (sql.Node, error) {
p, ok := n.(*plan.Project)
if !ok {
return n, nil
}

projection := p.Projections

g, err := findGenerator(projection)
if err != nil {
return nil, err
}

// There might be no generator in the project, in that case we don't
// have to do anything.
if g == nil {
return n, nil
}

projection[g.idx] = g.expr

var name string
if n, ok := g.expr.(sql.Nameable); ok {
name = n.Name()
} else {
name = g.expr.String()
}

return plan.NewGenerate(
plan.NewProject(projection, p.Child),
expression.NewGetField(g.idx, g.expr.Type(), name, g.expr.IsNullable()),
), nil
})
}

type generator struct {
idx int
expr sql.Expression
}

// findGenerator will find in the given projection a generator column. If there
// is no generator, it will return nil.
// If there are is than one generator or the argument to explode is not an
// array it will fail.
// All occurrences of Explode will be replaced with Generate.
func findGenerator(exprs []sql.Expression) (*generator, error) {
var g = &generator{idx: -1}
for i, e := range exprs {
var found bool
switch e := e.(type) {
case *function.Explode:
found = true
g.expr = function.NewGenerate(e.Child)
case *expression.Alias:
if exp, ok := e.Child.(*function.Explode); ok {
found = true
g.expr = expression.NewAlias(
function.NewGenerate(exp.Child),
e.Name(),
)
}
}

if found {
if g.idx >= 0 {
return nil, errMultipleGenerators.New()
}
g.idx = i

if !sql.IsArray(g.expr.Type()) {
return nil, errExplodeNotArray.New(g.expr.Type())
}
}
}

if g.expr == nil {
return nil, nil
}

return g, nil
}
117 changes: 117 additions & 0 deletions sql/analyzer/resolve_generators_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package analyzer

import (
"testing"

"github.com/stretchr/testify/require"
"gopkg.in/src-d/go-errors.v1"
"gopkg.in/src-d/go-mysql-server.v0/sql"
"gopkg.in/src-d/go-mysql-server.v0/sql/expression"
"gopkg.in/src-d/go-mysql-server.v0/sql/expression/function"
"gopkg.in/src-d/go-mysql-server.v0/sql/plan"
)

func TestResolveGenerators(t *testing.T) {
testCases := []struct {
name string
node sql.Node
expected sql.Node
err *errors.Kind
}{
{
name: "regular explode",
node: plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
function.NewExplode(expression.NewGetField(1, sql.Array(sql.Int64), "b", false)),
expression.NewGetField(2, sql.Int64, "c", false),
},
plan.NewUnresolvedTable("foo", ""),
),
expected: plan.NewGenerate(
plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
function.NewGenerate(expression.NewGetField(1, sql.Array(sql.Int64), "b", false)),
expression.NewGetField(2, sql.Int64, "c", false),
},
plan.NewUnresolvedTable("foo", ""),
),
expression.NewGetField(1, sql.Array(sql.Int64), "EXPLODE(b)", false),
),
err: nil,
},
{
name: "explode with alias",
node: plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
expression.NewAlias(
function.NewExplode(
expression.NewGetField(1, sql.Array(sql.Int64), "b", false),
),
"x",
),
expression.NewGetField(2, sql.Int64, "c", false),
},
plan.NewUnresolvedTable("foo", ""),
),
expected: plan.NewGenerate(
plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
expression.NewAlias(
function.NewGenerate(
expression.NewGetField(1, sql.Array(sql.Int64), "b", false),
),
"x",
),
expression.NewGetField(2, sql.Int64, "c", false),
},
plan.NewUnresolvedTable("foo", ""),
),
expression.NewGetField(1, sql.Array(sql.Int64), "x", false),
),
err: nil,
},
{
name: "non array type on explode",
node: plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
function.NewExplode(expression.NewGetField(1, sql.Int64, "b", false)),
},
plan.NewUnresolvedTable("foo", ""),
),
expected: nil,
err: errExplodeNotArray,
},
{
name: "more than one generator",
node: plan.NewProject(
[]sql.Expression{
expression.NewGetField(0, sql.Int64, "a", false),
function.NewExplode(expression.NewGetField(1, sql.Array(sql.Int64), "b", false)),
function.NewExplode(expression.NewGetField(2, sql.Array(sql.Int64), "c", false)),
},
plan.NewUnresolvedTable("foo", ""),
),
expected: nil,
err: errMultipleGenerators,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
result, err := resolveGenerators(sql.NewEmptyContext(), nil, tt.node)
if tt.err != nil {
require.Error(err)
require.True(tt.err.Is(err))
} else {
require.NoError(err)
require.Equal(tt.expected, result)
}
})
}
}
1 change: 1 addition & 0 deletions sql/analyzer/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var OnceBeforeDefault = []Rule{
// OnceAfterDefault contains the rules to be applied just once after the
// DefaultRules.
var OnceAfterDefault = []Rule{
{"resolve_generators", resolveGenerators},
{"remove_unnecessary_converts", removeUnnecessaryConverts},
{"assign_catalog", assignCatalog},
{"prune_columns", pruneColumns},
Expand Down
32 changes: 32 additions & 0 deletions sql/analyzer/validation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
validateIndexCreationRule = "validate_index_creation"
validateCaseResultTypesRule = "validate_case_result_types"
validateIntervalUsageRule = "validate_interval_usage"
validateExplodeUsageRule = "validate_explode_usage"
)

var (
Expand Down Expand Up @@ -51,6 +52,11 @@ var (
"invalid use of an interval, which can only be used with DATE_ADD, " +
"DATE_SUB and +/- operators to subtract from or add to a date",
)
// ErrExplodeInvalidUse is returned when an EXPLODE function is used
// outside a Project node.
ErrExplodeInvalidUse = errors.NewKind(
"using EXPLODE is not supported outside a Project node",
)
)

// DefaultValidationRules to apply while analyzing nodes.
Expand All @@ -63,6 +69,7 @@ var DefaultValidationRules = []Rule{
{validateIndexCreationRule, validateIndexCreation},
{validateCaseResultTypesRule, validateCaseResultTypes},
{validateIntervalUsageRule, validateIntervalUsage},
{validateExplodeUsageRule, validateExplodeUsage},
}

func validateIsResolved(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
Expand Down Expand Up @@ -290,6 +297,31 @@ func validateIntervalUsage(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node,
return n, nil
}

func validateExplodeUsage(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
var invalid bool
plan.InspectExpressions(n, func(e sql.Expression) bool {
// If it's already invalid just skip everything else.
if invalid {
return false
}

// All usage of Explode will be incorrect because the ones in projects
// would have already been converted to Generate, so we only have to
// look for those.
if _, ok := e.(*function.Explode); ok {
invalid = true
}

return true
})

if invalid {
return nil, ErrExplodeInvalidUse.New()
}

return n, nil
}

func stringContains(strs []string, target string) bool {
for _, s := range strs {
if s == target {
Expand Down
Loading

0 comments on commit 814b219

Please sign in to comment.