Skip to content

Commit

Permalink
GrouperFastImpl: removing unaligned access in SwissTable
Browse files Browse the repository at this point in the history
  • Loading branch information
michalursa committed May 6, 2021
1 parent ede35ca commit cddc9a2
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 129 deletions.
4 changes: 3 additions & 1 deletion cpp/src/arrow/compute/kernels/hash_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,9 @@ struct GrouperFastImpl : Grouper {
impl->key_types_[icol] = key;
}

impl->encoder_.Init(impl->col_metadata_, &impl->encode_ctx_, /* row_alignment = */ sizeof(uint64_t), /* string_alignment = */ sizeof(uint64_t));
impl->encoder_.Init(impl->col_metadata_, &impl->encode_ctx_,
/* row_alignment = */ sizeof(uint64_t),
/* string_alignment = */ sizeof(uint64_t));
RETURN_NOT_OK(impl->rows_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
RETURN_NOT_OK(
impl->rows_minibatch_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
Expand Down
170 changes: 92 additions & 78 deletions cpp/src/arrow/engine/key_encode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ Status KeyEncoder::KeyRowArray::AppendEmpty(uint32_t num_rows_to_append,
num_rows_ += num_rows_to_append;
if (metadata_.row_alignment > 1 || metadata_.string_alignment > 1) {
memset(rows_->mutable_data(), 0, bytes_capacity_);
}
}
return Status::OK();
}

Expand Down Expand Up @@ -652,8 +652,8 @@ void KeyEncoder::EncoderBinary::Decode(uint32_t start_row, uint32_t num_rows,

#if defined(ARROW_HAVE_AVX2)
if (ctx->has_avx2()) {
DecodeHelper_avx2(is_row_fixed_length, start_row, num_rows, offset_within_row,
rows, col);
DecodeHelper_avx2(is_row_fixed_length, start_row, num_rows, offset_within_row, rows,
col);
} else {
#endif
if (is_row_fixed_length) {
Expand Down Expand Up @@ -685,7 +685,8 @@ void KeyEncoder::EncoderBinary::EncodeImp(uint32_t offset_within_row, KeyRowArra
}
if ((length % 8) > 0) {
uint64_t mask_last = ~0ULL >> (8 * (8 * (istripe + 1) - length));
dst64[istripe] = (dst64[istripe] & ~mask_last) | (util::SafeLoad(src64 + istripe) & mask_last);
dst64[istripe] = (dst64[istripe] & ~mask_last) |
(util::SafeLoad(src64 + istripe) & mask_last);
}
});
}
Expand Down Expand Up @@ -1082,13 +1083,15 @@ void KeyEncoder::EncoderOffsets::EncodeImp(
col_length = 0;
}

offset_within_row += KeyRowMetadata::padding_for_alignment(offset_within_row, string_alignment);
offset_within_row +=
KeyRowMetadata::padding_for_alignment(offset_within_row, string_alignment);
offset_within_row += col_length;

varbinary_end[col] = offset_within_row;
}

offset_within_row += KeyRowMetadata::padding_for_alignment(offset_within_row, row_alignment);
offset_within_row +=
KeyRowMetadata::padding_for_alignment(offset_within_row, row_alignment);
row_offset += offset_within_row;
row_offsets[i + 1] = row_offset;
}
Expand Down Expand Up @@ -1140,7 +1143,8 @@ void KeyEncoder::EncoderOffsets::Decode(
// Update the offset of each column
uint32_t offset_within_row = rows.metadata().fixed_length;
for (size_t col = 0; col < varbinary_cols->size(); ++col) {
offset_within_row += KeyRowMetadata::padding_for_alignment(offset_within_row, string_alignment);
offset_within_row +=
KeyRowMetadata::padding_for_alignment(offset_within_row, string_alignment);
uint32_t length = varbinary_ends[col] - offset_within_row;
offset_within_row = varbinary_ends[col];
uint32_t* col_offsets = (*varbinary_cols)[col].mutable_offsets();
Expand Down Expand Up @@ -1202,7 +1206,8 @@ void KeyEncoder::EncoderVarBinary::EncodeImp(uint32_t varbinary_col_id, KeyRowAr
}
if ((length % 8) > 0) {
uint64_t mask_last = ~0ULL >> (8 * (8 * (istripe + 1) - length));
dst64[istripe] = (dst64[istripe] & ~mask_last) | (util::SafeLoad(src64 + istripe) & mask_last);
dst64[istripe] = (dst64[istripe] & ~mask_last) |
(util::SafeLoad(src64 + istripe) & mask_last);
}
});
}
Expand Down Expand Up @@ -1302,11 +1307,13 @@ bool KeyEncoder::KeyRowMetadata::is_compatible(const KeyRowMetadata& other) cons
if (other.num_cols() != num_cols()) {
return false;
}
if (row_alignment != other.row_alignment || string_alignment != other.string_alignment) {
if (row_alignment != other.row_alignment ||
string_alignment != other.string_alignment) {
return false;
}
for (size_t i = 0; i < column_metadatas.size(); ++i) {
if (column_metadatas[i].is_fixed_length != other.column_metadatas[i].is_fixed_length) {
if (column_metadatas[i].is_fixed_length !=
other.column_metadatas[i].is_fixed_length) {
return false;
}
if (column_metadatas[i].fixed_length != other.column_metadatas[i].fixed_length) {
Expand All @@ -1317,9 +1324,8 @@ bool KeyEncoder::KeyRowMetadata::is_compatible(const KeyRowMetadata& other) cons
}

void KeyEncoder::KeyRowMetadata::FromColumnMetadataVector(
const std::vector<KeyColumnMetadata> &cols,
int in_row_alignment, int in_string_alignment) {

const std::vector<KeyColumnMetadata>& cols, int in_row_alignment,
int in_string_alignment) {
column_metadatas.resize(cols.size());
for (size_t i = 0; i < cols.size(); ++i) {
column_metadatas[i] = cols[i];
Expand All @@ -1329,40 +1335,44 @@ void KeyEncoder::KeyRowMetadata::FromColumnMetadataVector(

// Sort columns.
// Columns are sorted based on the size in bytes of their fixed-length part.
// For the varying-length column, the fixed-length part is the 32-bit field storing
// For the varying-length column, the fixed-length part is the 32-bit field storing
// cumulative length of varying-length fields.
// The rules are:
// a) Boolean column, marked with fixed-length 0, is considered to have fixed-length part of 1 byte.
// b) Columns with fixed-length part being power of 2 or multiple of row alignment precede other columns.
// They are sorted among themselves based on size of fixed-length part.
// c) Fixed-length columns precede varying-length columns when both have the same size fixed-length part.
// a) Boolean column, marked with fixed-length 0, is considered to have fixed-length
// part of 1 byte. b) Columns with fixed-length part being power of 2 or multiple of row
// alignment precede other columns. They are sorted among themselves based on size of
// fixed-length part. c) Fixed-length columns precede varying-length columns when both
// have the same size fixed-length part.
column_order.resize(num_cols);
for (uint32_t i = 0; i < num_cols; ++i) {
column_order[i] = i;
}
std::sort(column_order.begin(), column_order.end(),
[&cols](uint32_t left, uint32_t right) {
bool is_left_pow2 = !cols[left].is_fixed_length || ARROW_POPCOUNT64(cols[left].fixed_length) <= 1;
bool is_right_pow2 = !cols[right].is_fixed_length || ARROW_POPCOUNT64(cols[right].fixed_length) <= 1;
bool is_left_fixedlen = cols[left].is_fixed_length;
bool is_right_fixedlen = cols[right].is_fixed_length;
uint32_t width_left = cols[left].is_fixed_length ? cols[left].fixed_length : sizeof(uint32_t);
uint32_t width_right = cols[right].is_fixed_length ? cols[right].fixed_length : sizeof(uint32_t);
if (is_left_pow2 != is_right_pow2) {
return is_left_pow2;
}
if (!is_left_pow2) {
std::sort(
column_order.begin(), column_order.end(), [&cols](uint32_t left, uint32_t right) {
bool is_left_pow2 =
!cols[left].is_fixed_length || ARROW_POPCOUNT64(cols[left].fixed_length) <= 1;
bool is_right_pow2 = !cols[right].is_fixed_length ||
ARROW_POPCOUNT64(cols[right].fixed_length) <= 1;
bool is_left_fixedlen = cols[left].is_fixed_length;
bool is_right_fixedlen = cols[right].is_fixed_length;
uint32_t width_left =
cols[left].is_fixed_length ? cols[left].fixed_length : sizeof(uint32_t);
uint32_t width_right =
cols[right].is_fixed_length ? cols[right].fixed_length : sizeof(uint32_t);
if (is_left_pow2 != is_right_pow2) {
return is_left_pow2;
}
if (!is_left_pow2) {
return left < right;
}
if (width_left != width_right) {
return width_left > width_right;
}
if (is_left_fixedlen != is_right_fixedlen) {
return is_left_fixedlen;
}
return left < right;
}
if (width_left != width_right) {
return width_left > width_right;
}
if (is_left_fixedlen != is_right_fixedlen) {
return is_left_fixedlen;
}
return left < right;
}
);
});

row_alignment = in_row_alignment;
string_alignment = in_string_alignment;
Expand All @@ -1373,13 +1383,15 @@ void KeyEncoder::KeyRowMetadata::FromColumnMetadataVector(
uint32_t offset_within_row = 0;
for (uint32_t i = 0; i < num_cols; ++i) {
const KeyColumnMetadata& col = cols[column_order[i]];
offset_within_row += KeyRowMetadata::padding_for_alignment(offset_within_row, string_alignment, col);
offset_within_row +=
KeyRowMetadata::padding_for_alignment(offset_within_row, string_alignment, col);
column_offsets[i] = offset_within_row;
if (!col.is_fixed_length) {
if (num_varbinary_cols == 0) {
varbinary_end_array_offset = offset_within_row;
}
ARROW_DCHECK(column_offsets[i] - varbinary_end_array_offset == num_varbinary_cols * sizeof(uint32_t));
ARROW_DCHECK(column_offsets[i] - varbinary_end_array_offset ==
num_varbinary_cols * sizeof(uint32_t));
++num_varbinary_cols;
offset_within_row += sizeof(uint32_t);
} else {
Expand All @@ -1395,7 +1407,10 @@ void KeyEncoder::KeyRowMetadata::FromColumnMetadataVector(
}

is_fixed_length = (num_varbinary_cols == 0);
fixed_length = offset_within_row + KeyRowMetadata::padding_for_alignment(offset_within_row, num_varbinary_cols == 0 ? row_alignment : string_alignment);
fixed_length =
offset_within_row +
KeyRowMetadata::padding_for_alignment(
offset_within_row, num_varbinary_cols == 0 ? row_alignment : string_alignment);

// We set the number of bytes per row storing null masks of individual key columns
// to be a power of two. This is not required. It could be also set to the minimal
Expand All @@ -1406,8 +1421,8 @@ void KeyEncoder::KeyRowMetadata::FromColumnMetadataVector(
}
}

void KeyEncoder::Init(const std::vector<KeyColumnMetadata>& cols,
KeyEncoderContext* ctx, int row_alignment, int string_alignment) {
void KeyEncoder::Init(const std::vector<KeyColumnMetadata>& cols, KeyEncoderContext* ctx,
int row_alignment, int string_alignment) {
ctx_ = ctx;
row_metadata_.FromColumnMetadataVector(cols, row_alignment, string_alignment);
uint32_t num_cols = row_metadata_.num_cols();
Expand All @@ -1417,11 +1432,11 @@ void KeyEncoder::Init(const std::vector<KeyColumnMetadata>& cols,
batch_varbinary_cols_base_offsets_.resize(num_varbinary_cols);
}

void KeyEncoder::PrepareKeyColumnArrays(
int64_t start_row, int64_t num_rows, const std::vector<KeyColumnArray>& cols_in) {
void KeyEncoder::PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
const std::vector<KeyColumnArray>& cols_in) {
uint32_t num_cols = static_cast<uint32_t>(cols_in.size());
ARROW_DCHECK(batch_all_cols_.size() == num_cols);

uint32_t num_varbinary_visited = 0;
for (uint32_t i = 0; i < num_cols; ++i) {
const KeyColumnArray& col = cols_in[row_metadata_.column_order[i]];
Expand Down Expand Up @@ -1465,8 +1480,8 @@ Status KeyEncoder::PrepareOutputForEncode(int64_t start_row, int64_t num_rows,
num_bytes_required = fixed_part + var_part;

rows->Clean();
RETURN_NOT_OK(rows->AppendEmpty(static_cast<uint32_t>(num_rows),
static_cast<uint32_t>(num_bytes_required)));
RETURN_NOT_OK(rows->AppendEmpty(static_cast<uint32_t>(num_rows),
static_cast<uint32_t>(num_bytes_required)));

return Status::OK();
}
Expand All @@ -1479,14 +1494,14 @@ void KeyEncoder::Encode(int64_t start_row, int64_t num_rows, KeyRowArray* rows,
// Create two temp vectors with 16-bit elements
auto temp_buffer_holder_A =
util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
auto temp_buffer_A =
KeyColumnArray(KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
reinterpret_cast<uint8_t*>(temp_buffer_holder_A.mutable_data()), nullptr);
auto temp_buffer_A = KeyColumnArray(
KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
reinterpret_cast<uint8_t*>(temp_buffer_holder_A.mutable_data()), nullptr);
auto temp_buffer_holder_B =
util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
auto temp_buffer_B =
KeyColumnArray(KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
reinterpret_cast<uint8_t*>(temp_buffer_holder_B.mutable_data()), nullptr);
auto temp_buffer_B = KeyColumnArray(
KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
reinterpret_cast<uint8_t*>(temp_buffer_holder_B.mutable_data()), nullptr);

bool is_row_fixed_length = row_metadata_.is_fixed_length;
if (!is_row_fixed_length) {
Expand All @@ -1510,14 +1525,13 @@ void KeyEncoder::Encode(int64_t start_row, int64_t num_rows, KeyRowArray* rows,
i += 1;
continue;
}
bool can_process_pair =
(i + 1 < num_cols) &&
batch_all_cols_[i + 1].metadata().is_fixed_length &&
bool can_process_pair =
(i + 1 < num_cols) && batch_all_cols_[i + 1].metadata().is_fixed_length &&
EncoderBinaryPair::CanProcessPair(batch_all_cols_[i].metadata(),
batch_all_cols_[i + 1].metadata());
if (!can_process_pair) {
EncoderBinary::Encode(row_metadata_.column_offsets[i], rows, batch_all_cols_[i], ctx_,
&temp_buffer_A);
EncoderBinary::Encode(row_metadata_.column_offsets[i], rows, batch_all_cols_[i],
ctx_, &temp_buffer_A);
i += 1;
} else {
EncoderBinaryPair::Encode(row_metadata_.column_offsets[i], rows, batch_all_cols_[i],
Expand All @@ -1541,19 +1555,19 @@ void KeyEncoder::DecodeFixedLengthBuffers(int64_t start_row_input,
// Create two temp vectors with 16-bit elements
auto temp_buffer_holder_A =
util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
auto temp_buffer_A =
KeyColumnArray(KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
reinterpret_cast<uint8_t*>(temp_buffer_holder_A.mutable_data()), nullptr);
auto temp_buffer_A = KeyColumnArray(
KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
reinterpret_cast<uint8_t*>(temp_buffer_holder_A.mutable_data()), nullptr);
auto temp_buffer_holder_B =
util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
auto temp_buffer_B =
KeyColumnArray(KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
reinterpret_cast<uint8_t*>(temp_buffer_holder_B.mutable_data()), nullptr);
auto temp_buffer_B = KeyColumnArray(
KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
reinterpret_cast<uint8_t*>(temp_buffer_holder_B.mutable_data()), nullptr);

bool is_row_fixed_length = row_metadata_.is_fixed_length;
if (!is_row_fixed_length) {
EncoderOffsets::Decode(static_cast<uint32_t>(start_row_input), static_cast<uint32_t>(num_rows),
rows, &batch_varbinary_cols_,
EncoderOffsets::Decode(static_cast<uint32_t>(start_row_input),
static_cast<uint32_t>(num_rows), rows, &batch_varbinary_cols_,
batch_varbinary_cols_base_offsets_, ctx_);
}

Expand All @@ -1565,14 +1579,14 @@ void KeyEncoder::DecodeFixedLengthBuffers(int64_t start_row_input,
continue;
}
bool can_process_pair =
(i + 1 < num_cols) &&
batch_all_cols_[i + 1].metadata().is_fixed_length &&
(i + 1 < num_cols) && batch_all_cols_[i + 1].metadata().is_fixed_length &&
EncoderBinaryPair::CanProcessPair(batch_all_cols_[i].metadata(),
batch_all_cols_[i + 1].metadata());
if (!can_process_pair) {
EncoderBinary::Decode(static_cast<uint32_t>(start_row_input), static_cast<uint32_t>(num_rows),
row_metadata_.column_offsets[i], rows,
&batch_all_cols_[i], ctx_, &temp_buffer_A);
EncoderBinary::Decode(static_cast<uint32_t>(start_row_input),
static_cast<uint32_t>(num_rows),
row_metadata_.column_offsets[i], rows, &batch_all_cols_[i],
ctx_, &temp_buffer_A);
i += 1;
} else {
EncoderBinaryPair::Decode(
Expand All @@ -1584,8 +1598,8 @@ void KeyEncoder::DecodeFixedLengthBuffers(int64_t start_row_input,
}

// Process nulls
EncoderNulls::Decode(static_cast<uint32_t>(start_row_input), static_cast<uint32_t>(num_rows),
rows, &batch_all_cols_);
EncoderNulls::Decode(static_cast<uint32_t>(start_row_input),
static_cast<uint32_t>(num_rows), rows, &batch_all_cols_);
}

void KeyEncoder::DecodeVaryingLengthBuffers(int64_t start_row_input,
Expand All @@ -1601,8 +1615,8 @@ void KeyEncoder::DecodeVaryingLengthBuffers(int64_t start_row_input,
for (uint32_t i = 0; i < num_varbinary_cols; ++i) {
// Memcpy varbinary fields into precomputed in the previous step
// positions in the output row buffer.
EncoderVarBinary::Decode(static_cast<uint32_t>(start_row_input), static_cast<uint32_t>(num_rows),
i, rows,
EncoderVarBinary::Decode(static_cast<uint32_t>(start_row_input),
static_cast<uint32_t>(num_rows), i, rows,
&batch_varbinary_cols_[i], ctx_);
}
}
Expand Down
Loading

0 comments on commit cddc9a2

Please sign in to comment.