-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: add hash join v2 #53208
executor: add hash join v2 #53208
Conversation
Hi @windtalker. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing? Can we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add a basic test
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #53208 +/- ##
=================================================
- Coverage 72.4538% 56.3978% -16.0561%
=================================================
Files 1508 1632 +124
Lines 432016 598190 +166174
=================================================
+ Hits 313012 337366 +24354
- Misses 99547 237685 +138138
- Partials 19457 23139 +3682
Flags with carried forward coverage won't be shown. Click here to find out more.
|
/ok-to-test |
pkg/executor/builder.go
Outdated
BuildWorkers: make([]*join.BuildWorkerV2, v.Concurrency), | ||
HashJoinCtxV2: &join.HashJoinCtxV2{ | ||
OtherCondition: v.OtherConditions, | ||
PartitionNumber: mathutil.Min(int(v.Concurrency), 16), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PartitionNumber: mathutil.Min(int(v.Concurrency), 16), | |
PartitionNumber: min(int(v.Concurrency), 16), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
811ae7b
to
b8de444
Compare
668ff10
to
6f78742
Compare
/test check-dev2 |
@windtalker: The specified target(s) for
Use In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
43644ee
to
150fdb7
Compare
/test unit-test |
@windtalker: The specified target(s) for
Use In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
4403be5
to
3d78136
Compare
3d78136
to
61cc71c
Compare
// step 1. fetch data from build side child and build a hash table; | ||
// step 2. fetch data from probe child in a background goroutine and probe the hash table in multiple join workers. | ||
func (e *HashJoinV2Exec) Next(ctx context.Context, req *chunk.Chunk) (err error) { | ||
if !e.prepared { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use atomic type for e.prepared
? As Close
may be called at the same time.
61cc71c
to
9aa750d
Compare
return err | ||
} | ||
|
||
// 2. build rowtable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to add // 1.
in previous code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
pkg/executor/join/join_row_table.go
Outdated
h := fnv.New64() | ||
fakePartIndex := 0 | ||
for logicalRowIndex, physicalRowIndex := range b.usedRows { | ||
if (b.filterVector != nil && !b.filterVector[physicalRowIndex]) || (b.nullKeyVector != nil && b.nullKeyVector[physicalRowIndex]) { | ||
b.hashValue[logicalRowIndex] = uint64(fakePartIndex) | ||
b.partIdxVector[logicalRowIndex] = fakePartIndex | ||
fakePartIndex = (fakePartIndex + 1) % hashJoinCtx.PartitionNumber | ||
continue | ||
} | ||
h.Write(b.serializedKeyVectorBuffer[logicalRowIndex]) | ||
hash := h.Sum64() | ||
b.hashValue[logicalRowIndex] = hash | ||
b.partIdxVector[logicalRowIndex] = int(hash % uint64(hashJoinCtx.PartitionNumber)) | ||
h.Reset() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can put these codes into a single function? As the purpose of them is clear and the reader could catch the point of them with function name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|---------------------|-----------------|----------------------|-------------------------------| | ||
| | | | | ||
V V V V | ||
next_row_ptr null_map serialized_key/key_length row_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can add each field's byte number in the introduction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems meaningless since only next_row_ptr
is 8 bytes, and all the other field is variable length.
pkg/executor/join/join_row_table.go
Outdated
const sizeOfNextPtr = int(unsafe.Sizeof(unsafe.Pointer(nil))) | ||
const sizeOfLengthField = int(unsafe.Sizeof(uint64(1))) | ||
const sizeOfUInt64 = int(unsafe.Sizeof(uint64(1))) | ||
const sizeOfInt = int(unsafe.Sizeof(int(1))) | ||
const sizeOfFloat64 = int(unsafe.Sizeof(float64(1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of them have been defined in tidb/pkg/util/serialization/common_util.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, update the code.
pkg/executor/join/join_row_table.go
Outdated
} | ||
|
||
func (b *rowTableBuilder) appendToRowTable(chk *chunk.Chunk, hashJoinCtx *HashJoinCtxV2, workerID int) error { | ||
fakeAddrByte := make([]byte, 8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can set it as a global variable or variable in rowTableBuilder, so that we can avoid unnecessary duplicate allocation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
pkg/executor/join/join_row_table.go
Outdated
// next_row_ptr | ||
seg.rawData = append(seg.rawData, fakeAddrByte...) | ||
rowLength += 8 | ||
// null_map | ||
if nullMapLength := rowTableMeta.nullMapLength; nullMapLength > 0 { | ||
bitmap := make([]byte, nullMapLength) | ||
for colIndexInRowTable, colIndexInRow := range rowTableMeta.rowColumnsOrder { | ||
colIndexInBitMap := colIndexInRowTable + rowTableMeta.colOffsetInNullMap | ||
if row.IsNull(colIndexInRow) { | ||
bitmap[colIndexInBitMap/8] |= 1 << (7 - colIndexInBitMap%8) | ||
} | ||
} | ||
seg.rawData = append(seg.rawData, bitmap...) | ||
rowLength += nullMapLength | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment with // next_row_ptr
and // null_map
makes reader confused. I think it's better to create function like serializeNextRowPtr
. Similarly, serializing null_map
and serialized_key/key_length
can also be encapsulated into a function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
This reverts commit 4782340.
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
0624d4a
to
c58b20a
Compare
What problem does this PR solve?
Issue Number: ref #53127
Problem Summary:
What changed and how does it work?
Design doc: #53196
Currently, only inner join and left outer join is supported.
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.