Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark list hashing #11292

Merged
merged 48 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ebf079a
Add test for Spark list-of-list hashing.
bdice Jun 28, 2022
5632e7c
Improve test.
bdice Jul 7, 2022
d721597
Merge remote-tracking branch 'upstream/branch-22.08' into spark-list-…
bdice Jul 7, 2022
aee8fe0
Merge remote-tracking branch 'upstream/branch-22.08' into spark-list-…
bdice Jul 18, 2022
6e55198
Copy experimental row hasher for modification.
bdice Jul 19, 2022
95f25cb
Make preprocessed table methods public.
bdice Jul 19, 2022
ac21705
Use structured binding to make names map more clearly to the objects.
bdice Jul 19, 2022
4106aef
Add note about hash types, fix tests.
bdice Jul 19, 2022
ca8557d
Align hashing behavior closer to Spark. Some tests passing, but not all.
bdice Jul 19, 2022
703956f
Remove commented code, fix dispatch for decimal32 and decimal128.
bdice Jul 19, 2022
a98c130
Reorder members to match constructor signature.
bdice Jul 19, 2022
659bca9
Update nested type hashing to act more like the previous serial hash …
bdice Jul 19, 2022
fcca18a
Use seed 42 for consistency with Spark.
bdice Jul 19, 2022
189f3bf
Clean up.
bdice Jul 20, 2022
ee754a7
Fix bug in test (wrong index).
bdice Jul 20, 2022
b163b49
Template row_hasher on device_row_hasher class.
bdice Jul 20, 2022
387390a
Fix up friends / private methods.
bdice Jul 20, 2022
931bd33
Merge remote-tracking branch 'pub/branch-22.08' into bdice/list_hashing
rwlee Jul 21, 2022
2218338
Enable list types in spark32BitMurmurHash3.
bdice Jul 22, 2022
bad0b5e
Remove template, always use Spark hash functor.
bdice Jul 22, 2022
2d30d2c
Add summary of differences in Spark hashing.
bdice Jul 22, 2022
4fee0f8
clang-format
bdice Jul 22, 2022
1194723
Switch back to template parameter so the device row hasher matches th…
bdice Jul 22, 2022
bec00fa
Merge remote-tracking branch 'pub/pull-request/11292' into bdice/list…
rwlee Jul 25, 2022
ee6cfda
Merge remote-tracking branch 'upstream/branch-22.08' into spark-list-…
bdice Jul 25, 2022
873452c
Use has_nested_nulls instead of has_nulls.
bdice Jul 25, 2022
2826655
Rename to spark_device_row_hasher.
bdice Jul 25, 2022
3e70ac5
Fix copyright.
bdice Jul 26, 2022
c4586f3
Add test for structs of lists.
bdice Jul 26, 2022
481ac82
Merge branch 'spark-list-hashing' of github.com:bdice/cudf into spark…
bdice Jul 26, 2022
1f75acf
Add failing test for lists of structs.
bdice Jul 26, 2022
e179a4c
Fail if a table with lists of structs is passed.
bdice Jul 26, 2022
f057bc6
Use spark_hash_value_type.
bdice Jul 26, 2022
929c589
Change template name to DeviceRowHasher.
bdice Jul 26, 2022
64727c9
Merge remote-tracking branch 'pub/pull-request/11292' into bdice/list…
rwlee Jul 26, 2022
248cfaa
Improve comment.
bdice Jul 26, 2022
f90af13
Rename to spark_murmur_device_row_hasher.
bdice Jul 26, 2022
4a80876
Use spark_hash_value_type more consistently, and enforce that null ha…
bdice Jul 26, 2022
5be3ea0
Added developer documentation.
bdice Jul 26, 2022
cf6f6cd
Use list initialization.
bdice Jul 26, 2022
6475517
add list test
rwlee Jul 26, 2022
0c0a1fb
Merge remote-tracking branch 'pub/pull-request/11292' into bdice/list…
rwlee Jul 27, 2022
028b0bc
nested struct java test
rwlee Jul 27, 2022
5d05984
Remove deleted constructor.
bdice Jul 27, 2022
9281aee
Require SparkMurmurHash3_32 in constructor.
bdice Jul 27, 2022
fa50d2c
Remove deleted public constructor because the constructor is declared…
bdice Jul 27, 2022
ef0ad3b
Template only the device_hasher method.
bdice Jul 27, 2022
aaadac2
Temporarily remove JNI tests due to an outstanding allocation bug.
bdice Jul 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ add_library(
src/hash/hashing.cu
src/hash/md5_hash.cu
src/hash/murmur_hash.cu
src/hash/spark_murmur_hash.cu
bdice marked this conversation as resolved.
Show resolved Hide resolved
src/interop/dlpack.cpp
src/interop/from_arrow.cu
src/interop/to_arrow.cu
Expand Down
6 changes: 6 additions & 0 deletions cpp/include/cudf/detail/hashing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ std::unique_ptr<column> murmur_hash3_32(
rmm::cuda_stream_view stream = cudf::default_stream_value,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

std::unique_ptr<column> spark_murmur_hash3_32(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why the APIs here don't have doxygen?

Copy link
Contributor Author

@bdice bdice Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're detail APIs, which don't require docs. The public API is cudf::hash.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The build does not require detail APIs to have doxygen but programmers would still appreciate documentation.
You can see many detail functions are documented with @copydoc tags for example.
I can add some detail to the https://github.com/rapidsai/cudf/blob/branch-22.08/cpp/docs/DOCUMENTATION.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be some significant changes in this file with other planned work (#11296), so I'm going to defer on this until I can do it for the whole file. I added a note to myself to improve this later for all hash functions. #10081 (comment)

table_view const& input,
uint32_t seed = cudf::DEFAULT_HASH_SEED,
rmm::cuda_stream_view stream = cudf::default_stream_value,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

template <template <typename> class hash_function>
std::unique_ptr<column> serial_murmur_hash3_32(
table_view const& input,
Expand Down
21 changes: 11 additions & 10 deletions cpp/include/cudf/table/experimental/row_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ class two_table_comparator {

namespace hash {
class row_hasher;
}
} // namespace hash

namespace equality {

Expand Down Expand Up @@ -1378,9 +1378,9 @@ class element_hasher {
CUDF_UNREACHABLE("Unsupported type in hash.");
}

Nullate _check_nulls; ///< Whether to check for nulls
uint32_t _seed; ///< The seed to use for hashing
hash_value_type _null_hash; ///< Hash value to use for null elements
Nullate _check_nulls; ///< Whether to check for nulls
};

/**
Expand All @@ -1394,8 +1394,6 @@ class device_row_hasher {
friend class row_hasher; ///< Allow row_hasher to access private members.

public:
device_row_hasher() = delete;

/**
* @brief Return the hash value of a row in the given table.
*
Expand Down Expand Up @@ -1484,12 +1482,12 @@ class device_row_hasher {
CUDF_HOST_DEVICE device_row_hasher(Nullate check_nulls,
table_device_view t,
uint32_t seed = DEFAULT_HASH_SEED) noexcept
: _table{t}, _seed(seed), _check_nulls{check_nulls}
: _check_nulls{check_nulls}, _table{t}, _seed(seed)
{
}

table_device_view const _table;
Nullate const _check_nulls;
table_device_view const _table;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is there a style guideline or other reasoning for the ordering change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing one of two possibilities:

  • He just chose to alphabetize.
  • The order in which members are initialized is based on the order that they are declared here, not the order that they appear in the initializer list (the part after the : in the constructor). Since check_nulls comes before t in the constructor signature, he may have reordered the initializer list to match, and then reordering this bit becomes necessary to avoid creating an asymmetry that could catch unwary developers off guard (there are subtle bugs that can come from the wrong initialization order if the constructor makes some invalid assumptions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about initialization order matching member order. Compilers sometimes throw warnings about this, and it’s good practice to make the constructor argument order match the initialization order and member order when possible.

uint32_t const _seed;
};

Expand Down Expand Up @@ -1539,11 +1537,14 @@ class row_hasher {
* @param seed The seed to use for the hash function
* @return A hash operator to use on the device
*/
template <template <typename> class hash_function = detail::default_hash, typename Nullate>
device_row_hasher<hash_function, Nullate> device_hasher(Nullate nullate = {},
uint32_t seed = DEFAULT_HASH_SEED) const
template <template <typename> class hash_function = detail::default_hash,
template <template <typename> class, typename>
class DeviceRowHasher = device_row_hasher,
typename Nullate>
DeviceRowHasher<hash_function, Nullate> device_hasher(Nullate nullate = {},
uint32_t seed = DEFAULT_HASH_SEED) const
{
return device_row_hasher<hash_function, Nullate>(nullate, *d_t, seed);
return DeviceRowHasher<hash_function, Nullate>(nullate, *d_t, seed);
}

private:
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/hash/hashing.cu
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ std::unique_ptr<column> hash(table_view const& input,
case (hash_id::HASH_MURMUR3): return murmur_hash3_32(input, seed, stream, mr);
case (hash_id::HASH_SERIAL_MURMUR3):
return serial_murmur_hash3_32<MurmurHash3_32>(input, seed, stream, mr);
case (hash_id::HASH_SPARK_MURMUR3):
return serial_murmur_hash3_32<SparkMurmurHash3_32>(input, seed, stream, mr);
case (hash_id::HASH_SPARK_MURMUR3): return spark_murmur_hash3_32(input, seed, stream, mr);
case (hash_id::HASH_MD5): return md5_hash(input, stream, mr);
default: CUDF_FAIL("Unsupported hash function.");
}
Expand Down
222 changes: 222 additions & 0 deletions cpp/src/hash/spark_murmur_hash.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/hashing.hpp>
#include <cudf/detail/utilities/hash_functions.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table_device_view.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/tabulate.h>

namespace cudf {
namespace detail {

namespace {

// TODO: Spark uses int32_t hash values, but libcudf defines hash_value_type as
// uint32_t elsewhere. I plan to move the SparkMurmurHash3_32 functor into this
// file (since it is only used here), and replace its use of hash_value_type
// with spark_hash_value_type. --bdice
using spark_hash_value_type = int32_t;

/**
* @brief Computes the hash value of a row in the given table.
*
* This functor uses Spark conventions for Murmur hashing, which differs from
* the Murmur implementation used in the rest of libcudf. These differences
* include:
* - Serially using the output hash as an input seed for the next item
* - Ignorance of null values
*
* The serial use of hashes as seeds means that data of different nested types
* can exhibit hash collisions. For example, a row of an integer column
* containing a 1 will have the same hash as a lists column of integers
* containing a list of [1] and a struct column of a single integer column
* containing a struct of {1}.
*
* As a consequence of ignoring null values, inputs like [1], [1, null], and
* [null, 1] have the same hash (an expected hash collision). This kind of
* collision can also occur across a table of nullable columns and with nulls
* in structs ({1, null} and {null, 1} have the same hash). The seed value (the
* previous element's hash value) is returned as the hash if an element is
* null.
*
* For additional differences such as special tail processing and decimal type
* handling, refer to the SparkMurmurHash3_32 functor.
*
* @tparam hash_function Hash functor to use for hashing elements. Must be SparkMurmurHash3_32.
* @tparam Nullate A cudf::nullate type describing whether to check for nulls.
*/
template <template <typename> class hash_function, typename Nullate>
class spark_murmur_device_row_hasher {
friend class cudf::experimental::row::hash::row_hasher; ///< Allow row_hasher to access private
///< members.

public:
/**
* @brief Return the hash value of a row in the given table.
*
* @param row_index The row index to compute the hash value of
* @return The hash value of the row
*/
__device__ auto operator()(size_type row_index) const noexcept
{
return detail::accumulate(
_table.begin(),
_table.end(),
_seed,
[row_index, nulls = this->_check_nulls] __device__(auto hash, auto column) {
return cudf::type_dispatcher(
column.type(), element_hasher_adapter<hash_function>{nulls, hash}, column, row_index);
});
}

private:
/**
* @brief Computes the hash value of an element in the given column.
*
* When the column is non-nested, this is a simple wrapper around the element_hasher.
* When the column is nested, this uses a seed value to serially compute each
* nested element, with the output hash becoming the seed for the next value.
* This requires constructing a new hash functor for each nested element,
* using the new seed from the previous element's hash. The hash of a null
* element is the input seed (the previous element's hash).
*/
template <template <typename> class hash_fn>
class element_hasher_adapter {
public:
__device__ element_hasher_adapter(Nullate check_nulls, uint32_t seed) noexcept
: _check_nulls(check_nulls), _seed(seed)
{
}

using hash_functor = cudf::experimental::row::hash::element_hasher<hash_fn, Nullate>;

template <typename T, CUDF_ENABLE_IF(not cudf::is_nested<T>())>
__device__ spark_hash_value_type operator()(column_device_view const& col,
size_type row_index) const noexcept
{
auto const hasher = hash_functor{_check_nulls, _seed, _seed};
return hasher.template operator()<T>(col, row_index);
}

template <typename T, CUDF_ENABLE_IF(cudf::is_nested<T>())>
__device__ spark_hash_value_type operator()(column_device_view const& col,
size_type row_index) const noexcept
{
column_device_view curr_col = col.slice(row_index, 1);
while (is_nested(curr_col.type())) {
if (curr_col.type().id() == type_id::STRUCT) {
if (curr_col.num_child_columns() == 0) { return _seed; }
// Non-empty structs are assumed to be decomposed and contain only one child
curr_col = detail::structs_column_device_view(curr_col).get_sliced_child(0);
} else if (curr_col.type().id() == type_id::LIST) {
curr_col = detail::lists_column_device_view(curr_col).get_sliced_child();
}
}

return detail::accumulate(
thrust::counting_iterator(0),
thrust::counting_iterator(curr_col.size()),
_seed,
[curr_col, nulls = this->_check_nulls] __device__(auto hash, auto element_index) {
auto const hasher = hash_functor{nulls, hash, hash};
return cudf::type_dispatcher<cudf::experimental::dispatch_void_if_nested>(
curr_col.type(), hasher, curr_col, element_index);
});
}

Nullate const _check_nulls; ///< Whether to check for nulls
uint32_t const _seed; ///< The seed to use for hashing, also returned for null elements
};

CUDF_HOST_DEVICE spark_murmur_device_row_hasher(Nullate check_nulls,
table_device_view t,
uint32_t seed = DEFAULT_HASH_SEED) noexcept
: _check_nulls{check_nulls}, _table{t}, _seed(seed)
{
// Error out if passed an unsupported hash_function
static_assert(
std::is_base_of_v<SparkMurmurHash3_32<int>, hash_function<int>>,
"spark_murmur_device_row_hasher only supports the SparkMurmurHash3_32 hash function");
}

Nullate const _check_nulls;
table_device_view const _table;
uint32_t const _seed;
};

void check_hash_compatibility(table_view const& input)
{
using column_checker_fn_t = std::function<void(column_view const&)>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a function wrapper is used here? Why can't just use a lambda?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - I asked this on a previous PR that did something like this (can't find the reference). The problem is that lambda functions defined as auto don't work when they must be called recursively. https://stackoverflow.com/questions/2067988/recursive-lambda-functions-in-c11

You can see this in other places in libcudf:

column_checker_fn_t check_column = [&](column_view const& c) {


column_checker_fn_t check_column = [&](column_view const& c) {
if (c.type().id() == type_id::LIST) {
auto const& list_col = lists_column_view(c);
CUDF_EXPECTS(list_col.child().type().id() != type_id::STRUCT,
"Cannot compute hash of a table with a LIST of STRUCT columns.");
check_column(list_col.child());
} else if (c.type().id() == type_id::STRUCT) {
for (auto child = c.child_begin(); child != c.child_end(); ++child) {
check_column(*child);
}
}
};

for (column_view const& c : input) {
check_column(c);
}
}

} // namespace

std::unique_ptr<column> spark_murmur_hash3_32(table_view const& input,
uint32_t seed,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto output = make_numeric_column(data_type(type_to_id<spark_hash_value_type>()),
input.num_rows(),
mask_state::UNALLOCATED,
stream,
mr);

// Return early if there's nothing to hash
if (input.num_columns() == 0 || input.num_rows() == 0) { return output; }

// Lists of structs are not supported
check_hash_compatibility(input);

bool const nullable = has_nested_nulls(input);
auto const row_hasher = cudf::experimental::row::hash::row_hasher(input, stream);
auto output_view = output->mutable_view();

// Compute the hash value for each row
thrust::tabulate(
rmm::exec_policy(stream),
output_view.begin<spark_hash_value_type>(),
output_view.end<spark_hash_value_type>(),
row_hasher.device_hasher<SparkMurmurHash3_32, spark_murmur_device_row_hasher>(nullable, seed));

return output;
}

} // namespace detail
} // namespace cudf
8 changes: 4 additions & 4 deletions cpp/src/table/row_operators.cu
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,13 @@ std::shared_ptr<preprocessed_table> preprocessed_table::create(table_view const&
{
check_eq_compatibility(t);

auto null_pushed_table = structs::detail::superimpose_parent_nulls(t, stream);
auto struct_offset_removed_table = remove_struct_child_offsets(std::get<0>(null_pushed_table));
auto [verticalized_lhs, _, __, ___] = decompose_structs(struct_offset_removed_table);
auto [null_pushed_table, null_masks] = structs::detail::superimpose_parent_nulls(t, stream);
auto struct_offset_removed_table = remove_struct_child_offsets(null_pushed_table);
auto [verticalized_lhs, _, __, ___] = decompose_structs(struct_offset_removed_table);

auto d_t = table_device_view_owner(table_device_view::create(verticalized_lhs, stream));
return std::shared_ptr<preprocessed_table>(
new preprocessed_table(std::move(d_t), std::move(std::get<1>(null_pushed_table))));
new preprocessed_table(std::move(d_t), std::move(null_masks)));
}

two_table_comparator::two_table_comparator(table_view const& left,
Expand Down
Loading