Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Do not save long-lived unsafe.Pointer in hash join v2 #54085

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,7 +1672,7 @@ func (b *executorBuilder) buildHashJoinV2(v *plannercore.PhysicalHashJoin) exec.
}

func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) exec.Executor {
if join.EnableHashJoinV2.Load() && v.CanUseHashJoinV2() {
if join.IsHashJoinV2Enabled() && v.CanUseHashJoinV2() {
return b.buildHashJoinV2(v)
}
leftExec := b.build(v.Children()[0])
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/join/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ go_test(
],
embed = [":join"],
flaky = True,
shard_count = 40,
shard_count = 41,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
24 changes: 15 additions & 9 deletions pkg/executor/join/base_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,17 @@ type matchedRowInfo struct {
// probeRowIndex mean the probe side index of current matched row
probeRowIndex int
// buildRowStart mean the build row start of the current matched row
buildRowStart unsafe.Pointer
buildRowStart uintptr
// buildRowOffset mean the current offset of current BuildRow, used to construct column data from BuildRow
buildRowOffset int
}

func createMatchRowInfo(probeRowIndex int, buildRowStart unsafe.Pointer) *matchedRowInfo {
ret := &matchedRowInfo{probeRowIndex: probeRowIndex}
*(*unsafe.Pointer)(unsafe.Pointer(&ret.buildRowStart)) = buildRowStart
return ret
}

type posAndHashValue struct {
hashValue uint64
pos int
Expand All @@ -96,8 +102,8 @@ type baseJoinProbe struct {
selRows []int
usedRows []int
// matchedRowsHeaders, serializedKeys is indexed by logical row index
matchedRowsHeaders []unsafe.Pointer // the start address of each matched rows
serializedKeys [][]byte // used for save serialized keys
matchedRowsHeaders []uintptr // the start address of each matched rows
serializedKeys [][]byte // used for save serialized keys
// filterVector and nullKeyVector is indexed by physical row index because the return vector of VectorizedFilter is based on physical row index
filterVector []bool // if there is filter before probe, filterVector saves the filter result
nullKeyVector []bool // nullKeyVector[i] = true if any of the key is null
Expand Down Expand Up @@ -164,7 +170,7 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
if cap(j.matchedRowsHeaders) >= logicalRows {
j.matchedRowsHeaders = j.matchedRowsHeaders[:logicalRows]
} else {
j.matchedRowsHeaders = make([]unsafe.Pointer, logicalRows)
j.matchedRowsHeaders = make([]uintptr, logicalRows)
}
for i := 0; i < j.ctx.PartitionNumber; i++ {
j.hashValues[i] = j.hashValues[i][:0]
Expand Down Expand Up @@ -213,7 +219,7 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
for logicalRowIndex, physicalRowIndex := range j.usedRows {
if (j.filterVector != nil && !j.filterVector[physicalRowIndex]) || (j.nullKeyVector != nil && j.nullKeyVector[physicalRowIndex]) {
// explicit set the matchedRowsHeaders[logicalRowIndex] to nil to indicate there is no matched rows
j.matchedRowsHeaders[logicalRowIndex] = nil
j.matchedRowsHeaders[logicalRowIndex] = 0
continue
}
hash.Reset()
Expand Down Expand Up @@ -337,14 +343,14 @@ func (j *baseJoinProbe) appendBuildRowToChunkInternal(chk *chunk.Chunk, usedCols
if ok {
currentColumn = chk.Column(indexInDstChk)
for index := range j.cachedBuildRows {
currentColumn.AppendNullBitmap(!meta.isColumnNull(j.cachedBuildRows[index].buildRowStart, columnIndex))
j.cachedBuildRows[index].buildRowOffset = chunk.AppendCellFromRawData(currentColumn, j.cachedBuildRows[index].buildRowStart, j.cachedBuildRows[index].buildRowOffset)
currentColumn.AppendNullBitmap(!meta.isColumnNull(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), columnIndex))
j.cachedBuildRows[index].buildRowOffset = chunk.AppendCellFromRawData(currentColumn, *(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset)
}
} else {
// not used so don't need to insert into chk, but still need to advance rowData
if meta.columnsSize[columnIndex] < 0 {
for index := range j.cachedBuildRows {
size := *(*uint64)(unsafe.Add(j.cachedBuildRows[index].buildRowStart, j.cachedBuildRows[index].buildRowOffset))
size := *(*uint64)(unsafe.Add(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset))
j.cachedBuildRows[index].buildRowOffset += sizeOfLengthField + int(size)
}
} else {
Expand Down Expand Up @@ -497,7 +503,7 @@ func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType core.JoinType, keyIn
}
}
base.cachedBuildRows = make([]*matchedRowInfo, 0, batchBuildRowSize)
base.matchedRowsHeaders = make([]unsafe.Pointer, 0, chunk.InitialCapacity)
base.matchedRowsHeaders = make([]uintptr, 0, chunk.InitialCapacity)
base.selRows = make([]int, 0, chunk.InitialCapacity)
for i := 0; i < chunk.InitialCapacity; i++ {
base.selRows = append(base.selRows, i)
Expand Down
32 changes: 32 additions & 0 deletions pkg/executor/join/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package join

import (
"runtime"
"testing"
"unsafe"

"github.com/pingcap/tidb/pkg/util"
)
Expand Down Expand Up @@ -55,3 +57,33 @@ func BenchmarkHashTableConcurrentBuild(b *testing.B) {
}
wg.Wait()
}

func BenchmarkTestUnsafePointer(b *testing.B) {
size := int(1e3)
a := make([]byte, size*8)
p := make([]unsafe.Pointer, size)
b.StopTimer()
for i := 0; i < size; i++ {
p[i] = unsafe.Pointer(&a[i*8])
}
for i := 0; i < size; i++ {
*(*int64)(p[i]) = int64(i)
}
runtime.KeepAlive(a)
runtime.KeepAlive(p)
}

func BenchmarkTestUseUintptrAsUnsafePointer(b *testing.B) {
size := int(1e3)
a := make([]byte, size*8)
p := make([]uintptr, size)
b.StopTimer()
for i := 0; i < size; i++ {
*(*unsafe.Pointer)(unsafe.Pointer(&p[i])) = unsafe.Pointer(&a[i*8])
}
for i := 0; i < size; i++ {
*(*int64)((unsafe.Pointer)(&p[i])) = int64(i)
}
runtime.KeepAlive(a)
runtime.KeepAlive(p)
}
22 changes: 16 additions & 6 deletions pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
Expand All @@ -41,10 +40,23 @@ import (

var (
_ exec.Executor = &HashJoinV2Exec{}
// EnableHashJoinV2 is a variable used only in test
EnableHashJoinV2 = atomic.Bool{}
// enableHashJoinV2 is a variable used only in test
enableHashJoinV2 = atomic.Bool{}
)

// IsHashJoinV2Enabled return true if hash join v2 is enabled
func IsHashJoinV2Enabled() bool {
// sizeOfUintptr should always equal to sizeOfUnsafePointer, because according to golang's doc,
// a Pointer can be converted to an uintptr. Add this check here in case in the future go runtime
// change this
return enableHashJoinV2.Load() && sizeOfUintptr >= sizeOfUnsafePointer
}

// SetEnableHashJoinV2 enable/disable hash join v2
func SetEnableHashJoinV2(enable bool) {
enableHashJoinV2.Store(enable)
}

type hashTableContext struct {
// rowTables is used during split partition stage, each buildWorker has
// its own rowTable
Expand Down Expand Up @@ -77,9 +89,7 @@ func (htc *hashTableContext) getCurrentRowSegment(workerID, partitionID int, tab

func (htc *hashTableContext) finalizeCurrentSeg(workerID, partitionID int, builder *rowTableBuilder) {
seg := htc.getCurrentRowSegment(workerID, partitionID, nil, false)
for _, pos := range builder.startPosInRawData[partitionID] {
seg.rowLocations = append(seg.rowLocations, unsafe.Pointer(&seg.rawData[pos]))
}
seg.rowStartOffset = append(seg.rowStartOffset, builder.startPosInRawData[partitionID]...)
builder.crrntSizeOfRowTable[partitionID] = 0
builder.startPosInRawData[partitionID] = builder.startPosInRawData[partitionID][:0]
failpoint.Inject("finalizeCurrentSegPanic", nil)
Expand Down
20 changes: 10 additions & 10 deletions pkg/executor/join/hash_table_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (

type subTable struct {
rowData *rowTable
hashTable []unsafe.Pointer
hashTable []uintptr
posMask uint64
isRowTableEmpty bool
isHashTableEmpty bool
}

func (st *subTable) lookup(hashValue uint64) unsafe.Pointer {
func (st *subTable) lookup(hashValue uint64) uintptr {
return st.hashTable[hashValue&st.posMask]
}

Expand Down Expand Up @@ -56,21 +56,21 @@ func newSubTable(table *rowTable) *subTable {
ret.isHashTableEmpty = true
}
hashTableLength := max(nextPowerOfTwo(table.validKeyCount()), uint64(1024))
ret.hashTable = make([]unsafe.Pointer, hashTableLength)
ret.hashTable = make([]uintptr, hashTableLength)
ret.posMask = hashTableLength - 1
return ret
}

func (st *subTable) updateHashValue(pos uint64, rowAddress unsafe.Pointer) {
prev := st.hashTable[pos]
st.hashTable[pos] = rowAddress
prev := *(*unsafe.Pointer)(unsafe.Pointer(&st.hashTable[pos]))
*(*unsafe.Pointer)(unsafe.Pointer(&st.hashTable[pos])) = rowAddress
setNextRowAddress(rowAddress, prev)
}

func (st *subTable) atomicUpdateHashValue(pos uint64, rowAddress unsafe.Pointer) {
for {
prev := atomic.LoadPointer(&st.hashTable[pos])
if atomic.CompareAndSwapPointer(&st.hashTable[pos], prev, rowAddress) {
prev := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&st.hashTable[pos])))
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&st.hashTable[pos])), prev, rowAddress) {
setNextRowAddress(rowAddress, prev)
break
}
Expand All @@ -81,7 +81,7 @@ func (st *subTable) build(startSegmentIndex int, endSegmentIndex int) {
if startSegmentIndex == 0 && endSegmentIndex == len(st.rowData.segments) {
for i := startSegmentIndex; i < endSegmentIndex; i++ {
for _, index := range st.rowData.segments[i].validJoinKeyPos {
rowAddress := st.rowData.segments[i].rowLocations[index]
rowAddress := st.rowData.segments[i].getRowPointer(index)
hashValue := st.rowData.segments[i].hashValues[index]
pos := hashValue & st.posMask
st.updateHashValue(pos, rowAddress)
Expand All @@ -90,7 +90,7 @@ func (st *subTable) build(startSegmentIndex int, endSegmentIndex int) {
} else {
for i := startSegmentIndex; i < endSegmentIndex; i++ {
for _, index := range st.rowData.segments[i].validJoinKeyPos {
rowAddress := st.rowData.segments[i].rowLocations[index]
rowAddress := st.rowData.segments[i].getRowPointer(index)
hashValue := st.rowData.segments[i].hashValues[index]
pos := hashValue & st.posMask
st.atomicUpdateHashValue(pos, rowAddress)
Expand All @@ -117,7 +117,7 @@ type rowIter struct {
}

func (ri *rowIter) getValue() unsafe.Pointer {
return ri.table.tables[ri.currentPos.subTableIndex].rowData.segments[ri.currentPos.rowSegmentIndex].rowLocations[ri.currentPos.rowIndex]
return ri.table.tables[ri.currentPos.subTableIndex].rowData.segments[ri.currentPos.rowSegmentIndex].getRowPointer(int(ri.currentPos.rowIndex))
}

func (ri *rowIter) next() {
Expand Down
35 changes: 21 additions & 14 deletions pkg/executor/join/hash_table_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func createMockRowTable(maxRowsPerSeg int, segmentCount int, fixedSize bool) *ro
}
rowSeg.rawData = make([]byte, rows)
for j := 0; j < rows; j++ {
rowSeg.rowLocations = append(rowSeg.rowLocations, unsafe.Pointer(&rowSeg.rawData[j]))
rowSeg.rowStartOffset = append(rowSeg.rowStartOffset, uint64(j))
rowSeg.validJoinKeyPos = append(rowSeg.validJoinKeyPos, j)
}
ret.segments = append(ret.segments, rowSeg)
Expand Down Expand Up @@ -117,20 +117,22 @@ func TestBuild(t *testing.T) {
subTable.build(0, len(rowTable.segments))
rowSet := make(map[unsafe.Pointer]struct{}, rowTable.rowCount())
for _, seg := range rowTable.segments {
for _, loc := range seg.rowLocations {
for index := range seg.rowStartOffset {
loc := seg.getRowPointer(index)
_, ok := rowSet[loc]
require.False(t, ok)
rowSet[loc] = struct{}{}
}
}
rowCount := 0
for _, loc := range subTable.hashTable {
for loc != nil {
for _, locHolder := range subTable.hashTable {
for locHolder != 0 {
rowCount++
loc := *(*unsafe.Pointer)(unsafe.Pointer(&locHolder))
_, ok := rowSet[loc]
require.True(t, ok)
delete(rowSet, loc)
loc = getNextRowAddress(loc)
locHolder = getNextRowAddress(loc)
}
}
require.Equal(t, 0, len(rowSet))
Expand All @@ -157,18 +159,20 @@ func TestConcurrentBuild(t *testing.T) {
wg.Wait()
rowSet := make(map[unsafe.Pointer]struct{}, rowTable.rowCount())
for _, seg := range rowTable.segments {
for _, loc := range seg.rowLocations {
for index := range seg.rowStartOffset {
loc := seg.getRowPointer(index)
_, ok := rowSet[loc]
require.False(t, ok)
rowSet[loc] = struct{}{}
}
}
for _, loc := range subTable.hashTable {
for loc != nil {
for _, locHolder := range subTable.hashTable {
for locHolder != 0 {
loc := *(*unsafe.Pointer)(unsafe.Pointer(&locHolder))
_, ok := rowSet[loc]
require.True(t, ok)
delete(rowSet, loc)
loc = getNextRowAddress(loc)
locHolder = getNextRowAddress(loc)
}
}
require.Equal(t, 0, len(rowSet))
Expand All @@ -182,16 +186,18 @@ func TestLookup(t *testing.T) {
subTable.build(0, len(rowTable.segments))

for _, seg := range rowTable.segments {
for index, loc := range seg.rowLocations {
for index := range seg.rowStartOffset {
hashValue := seg.hashValues[index]
candidate := subTable.lookup(hashValue)
loc := seg.getRowPointer(index)
found := false
for candidate != nil {
if candidate == loc {
for candidate != 0 {
candidatePtr := *(*unsafe.Pointer)(unsafe.Pointer(&candidate))
if candidatePtr == loc {
found = true
break
}
candidate = getNextRowAddress(candidate)
candidate = getNextRowAddress(candidatePtr)
}
require.True(t, found)
}
Expand All @@ -204,7 +210,8 @@ func checkRowIter(t *testing.T, table *hashTableV2, scanConcurrency int) {
rowSet := make(map[unsafe.Pointer]struct{}, totalRowCount)
for _, rt := range table.tables {
for _, seg := range rt.rowData.segments {
for _, loc := range seg.rowLocations {
for index := range seg.rowStartOffset {
loc := seg.getRowPointer(index)
_, ok := rowSet[loc]
require.False(t, ok)
rowSet[loc] = struct{}{}
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/join/inner_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package join

import (
"sync/atomic"
"unsafe"

"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -45,11 +46,11 @@ func (j *innerJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlk
defer joinedChk.SetInCompleteChunk(isInCompleteChunk)

for remainCap > 0 && j.currentProbeRow < j.chunkRows {
if j.matchedRowsHeaders[j.currentProbeRow] != nil {
candidateRow := j.matchedRowsHeaders[j.currentProbeRow]
if j.matchedRowsHeaders[j.currentProbeRow] != 0 {
candidateRow := *(*unsafe.Pointer)(unsafe.Pointer(&j.matchedRowsHeaders[j.currentProbeRow]))
if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
// key matched, convert row to column for build side
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(&matchedRowInfo{probeRowIndex: j.currentProbeRow, buildRowStart: candidateRow}, joinedChk, 0, hasOtherCondition)
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(j.currentProbeRow, candidateRow), joinedChk, 0, hasOtherCondition)
j.matchedRowsForCurrentProbeRow++
remainCap--
} else {
Expand Down
Loading