Skip to content

Commit

Permalink
util: refine chunk.SwapColumn to rebuild the column reference (#7841) (
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored and shenli committed Oct 10, 2018
1 parent 0e01dbe commit dabb9b8
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 1 deletion.
40 changes: 39 additions & 1 deletion util/chunk/chunk.go
Expand Up @@ -110,9 +110,47 @@ func (c *Chunk) MakeRef(srcColIdx, dstColIdx int) {
c.columns[dstColIdx] = c.columns[srcColIdx]
}

// SwapColumn swaps column "c.columns[colIdx]" with column "other.columns[otherIdx]".
// SwapColumn swaps column "c.columns[colIdx]" with column
// "other.columns[otherIdx]". If there exists columns refer to the column to be
// swapped, we need to re-build the reference.
func (c *Chunk) SwapColumn(colIdx int, other *Chunk, otherIdx int) {
// Find the leftmost column of the reference which is the actual column to
// be swapped.
for i := 0; i < colIdx; i++ {
if c.columns[i] == c.columns[colIdx] {
colIdx = i
}
}
for i := 0; i < otherIdx; i++ {
if other.columns[i] == other.columns[otherIdx] {
otherIdx = i
}
}

// Find the columns which refer to the actual column to be swapped.
refColsIdx := make([]int, 0, len(c.columns)-colIdx)
for i := colIdx; i < len(c.columns); i++ {
if c.columns[i] == c.columns[colIdx] {
refColsIdx = append(refColsIdx, i)
}
}
refColsIdx4Other := make([]int, 0, len(other.columns)-otherIdx)
for i := otherIdx; i < len(other.columns); i++ {
if other.columns[i] == other.columns[otherIdx] {
refColsIdx4Other = append(refColsIdx4Other, i)
}
}

// Swap columns from two chunks.
c.columns[colIdx], other.columns[otherIdx] = other.columns[otherIdx], c.columns[colIdx]

// Rebuild the reference.
for _, i := range refColsIdx {
c.MakeRef(colIdx, i)
}
for _, i := range refColsIdx4Other {
other.MakeRef(otherIdx, i)
}
}

// SwapColumns swaps columns with another Chunk.
Expand Down
46 changes: 46 additions & 0 deletions util/chunk/chunk_test.go
Expand Up @@ -456,6 +456,52 @@ func (s *testChunkSuite) TestChunkMemoryUsage(c *check.C) {
c.Assert(memUsage, check.Equals, int64(expectedUsage))
}

func (s *testChunkSuite) TestSwapColumn(c *check.C) {
fieldTypes := make([]*types.FieldType, 0, 2)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})

// chk1: column1 refers to column0
chk1 := NewChunkWithCapacity(fieldTypes, 1)
chk1.AppendFloat64(0, 1)
chk1.MakeRef(0, 1)
chk1.AppendFloat64(2, 3)

// chk2: column1 refers to column0
chk2 := NewChunkWithCapacity(fieldTypes, 1)
chk2.AppendFloat64(0, 1)
chk2.MakeRef(0, 1)
chk2.AppendFloat64(2, 3)

c.Assert(chk1.columns[0] == chk1.columns[1], check.IsTrue)
c.Assert(chk2.columns[0] == chk2.columns[1], check.IsTrue)

checkRef := func() {
c.Assert(chk1.columns[0] == chk1.columns[1], check.IsTrue)
c.Assert(chk1.columns[0] == chk2.columns[0], check.IsFalse)
c.Assert(chk2.columns[0] == chk2.columns[1], check.IsTrue)
}

chk1.SwapColumn(0, chk2, 0)
checkRef()

chk1.SwapColumn(0, chk2, 1)
checkRef()

chk2.SwapColumn(1, chk2, 0)
checkRef()

chk2.SwapColumn(1, chk2, 1)
checkRef()

chk2.SwapColumn(1, chk2, 2)
checkRef()

chk2.SwapColumn(2, chk2, 0)
checkRef()
}

func BenchmarkAppendInt(b *testing.B) {
b.ReportAllocs()
chk := newChunk(8)
Expand Down

0 comments on commit dabb9b8

Please sign in to comment.