Skip to content

Commit

Permalink
ddl: fix a bug that adding range partition meets error when value is …
Browse files Browse the repository at this point in the history
…negative (pingcap#11407)
  • Loading branch information
tiancaiamao committed Aug 2, 2019
1 parent 2ba7832 commit 1da5cc7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
16 changes: 16 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,19 @@ func (s *testIntegrationSuite5) TestAlterTableAddPartition(c *C) {
partition p5 values less than maxvalue
);`
assertErrorCode(c, tk, sql7, tmysql.ErrSameNamePartition)

tk.MustExec("alter table table3 add partition (partition p3 values less than (2001 + 10))")

// less than value can be negative or expression.
tk.MustExec(`CREATE TABLE tt5 (
c3 bigint(20) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
PARTITION BY RANGE ( c3 ) (
PARTITION p0 VALUES LESS THAN (-3),
PARTITION p1 VALUES LESS THAN (-2)
);`)
tk.MustExec(`ALTER TABLE tt5 add partition ( partition p2 values less than (-1) );`)
tk.MustExec(`ALTER TABLE tt5 add partition ( partition p3 values less than (5-1) );`)
}

func (s *testIntegrationSuite6) TestAlterTableDropPartition(c *C) {
Expand Down Expand Up @@ -1436,6 +1449,9 @@ func (s *testIntegrationSuite4) TestPartitionErrorCode(c *C) {
_, err := tk.Exec("alter table employees add partition partitions 8;")
c.Assert(ddl.ErrUnsupportedAddPartition.Equal(err), IsTrue)

_, err = tk.Exec("alter table employees add partition (partition p5 values less than (42));")
c.Assert(ddl.ErrUnsupportedAddPartition.Equal(err), IsTrue)

// coalesce partition
tk.MustExec(`create table clients (
id int,
Expand Down
26 changes: 8 additions & 18 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2114,7 +2114,8 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *
}

meta := t.Meta()
if meta.GetPartitionInfo() == nil {
pi := meta.GetPartitionInfo()
if pi == nil {
return errors.Trace(ErrPartitionMgmtOnNonpartitioned)
}
// We don't support add hash type partition now.
Expand All @@ -2137,7 +2138,11 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *
return errors.Trace(err)
}

err = checkAddPartitionValue(meta, partInfo)
// partInfo contains only the new added partition, we have to combine it with the
// old partitions to check all partitions is strictly increasing.
tmp := *partInfo
tmp.Definitions = append(pi.Definitions, tmp.Definitions...)
err = checkCreatePartitionValue(ctx, meta, &tmp, t.Cols())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -3270,27 +3275,12 @@ func buildPartitionInfo(meta *model.TableInfo, d *ddl, spec *ast.AlterTableSpec)
Columns: meta.Partition.Columns,
Enable: meta.Partition.Enable,
}

genIDs, err := d.genGlobalIDs(len(spec.PartDefinitions))
if err != nil {
return nil, err
}
buf := new(bytes.Buffer)
for ith, def := range spec.PartDefinitions {
for _, expr := range def.LessThan {
tp := expr.GetType().Tp
if len(part.Columns) == 0 {
// Partition by range.
if !(tp == mysql.TypeLong || tp == mysql.TypeLonglong) {
expr.Format(buf)
if strings.EqualFold(buf.String(), "MAXVALUE") {
continue
}
buf.Reset()
return nil, infoschema.ErrColumnNotExists.GenWithStackByArgs(buf.String(), "partition function")
}
}
// Partition by range columns if len(part.Columns) != 0.
}
piDef := model.PartitionDefinition{
Name: def.Name,
ID: genIDs[ith],
Expand Down

0 comments on commit 1da5cc7

Please sign in to comment.