Skip to content

Commit

Permalink
review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
upsj committed Aug 26, 2022
1 parent 3ca6e51 commit 918e45e
Showing 1 changed file with 63 additions and 67 deletions.
130 changes: 63 additions & 67 deletions cpp/src/io/text/multibyte_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -141,32 +141,37 @@ struct PatternScan {
// it begins in. From there, each thread can then take deterministic action. In this case, the
// deterministic action is counting and outputting delimiter offsets when a delimiter is found.

struct singlepass_offset {
// magnitude stores the offset, sign bit stores whether we are past the end of the last delimiter
// This struct provides output offsets that are only incremented past a cutoff point.
struct cutoff_offset {
// magnitude stores the offset, sign bit stores whether we are past the cutoff
int64_t value;

constexpr void init(bool is_delim, bool is_past_range)
constexpr cutoff_offset() : cutoff_offset{0} {}

explicit constexpr cutoff_offset(int64_t value) : value{value} {}

constexpr cutoff_offset(bool is_delim, bool is_past_cutoff)
: value{is_past_cutoff ? -is_delim : is_delim}
{
value = is_past_range ? -is_delim : is_delim;
}

[[nodiscard]] constexpr int64_t offset() const { return value < 0 ? -value : value; }

[[nodiscard]] constexpr bool is_past_end() { return value < 0; }

friend constexpr singlepass_offset operator+(singlepass_offset lhs, singlepass_offset rhs)
friend constexpr cutoff_offset operator+(cutoff_offset lhs, cutoff_offset rhs)
{
auto const past_end = lhs.is_past_end() || rhs.is_past_end();
auto const offset = lhs.offset() + (lhs.is_past_end() ? 0 : rhs.offset());
return {past_end ? -offset : offset};
return cutoff_offset{past_end ? -offset : offset};
}
};

__global__ void multibyte_split_init_kernel(
cudf::size_type base_tile_idx,
cudf::size_type num_tiles,
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<singlepass_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_status status =
cudf::io::text::detail::scan_tile_status::invalid)
{
Expand All @@ -180,9 +185,9 @@ __global__ void multibyte_split_init_kernel(

__global__ void multibyte_split_seed_kernel(
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<singlepass_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
multistate tile_multistate_seed,
singlepass_offset tile_output_offset)
cutoff_offset tile_output_offset)
{
auto const thread_idx = blockIdx.x * blockDim.x + threadIdx.x;
if (thread_idx == 0) {
Expand All @@ -196,16 +201,16 @@ __global__ void multibyte_split_kernel(
int64_t base_input_offset,
int64_t base_offset_offset,
cudf::io::text::detail::scan_tile_state_view<multistate> tile_multistates,
cudf::io::text::detail::scan_tile_state_view<singlepass_offset> tile_output_offsets,
cudf::io::text::detail::scan_tile_state_view<cutoff_offset> tile_output_offsets,
cudf::io::text::detail::trie_device_view trie,
cudf::device_span<char const> chunk_input_chars,
int64_t byte_range_end,
cudf::split_device_span<int64_t> output_offsets)
{
using InputLoad =
cub::BlockLoad<char, THREADS_PER_TILE, ITEMS_PER_THREAD, cub::BLOCK_LOAD_VECTORIZE>;
using OffsetScan = cub::BlockScan<singlepass_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<singlepass_offset>;
using OffsetScan = cub::BlockScan<cutoff_offset, THREADS_PER_TILE>;
using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback<cutoff_offset>;

__shared__ union {
typename InputLoad::TempStorage input_load;
Expand Down Expand Up @@ -237,15 +242,13 @@ __global__ void multibyte_split_kernel(

// STEP 3: Flag matches

singlepass_offset thread_offset{};
cutoff_offset thread_offset{0};

for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) {
bool const match = i < thread_input_size and trie.is_match(thread_states[i]);
auto const match_end = base_input_offset + thread_input_offset + i + 1;
bool const past_range = match_end >= byte_range_end;
singlepass_offset local_offset{};
local_offset.init(match, past_range);
thread_offset = thread_offset + local_offset;
bool const is_match = i < thread_input_size and trie.is_match(thread_states[i]);
auto const match_end = base_input_offset + thread_input_offset + i + 1;
bool const is_past_range = match_end >= byte_range_end;
thread_offset = thread_offset + cutoff_offset{is_match, is_past_range};
}

// STEP 4: Scan flags to determine absolute thread output offset
Expand All @@ -259,12 +262,10 @@ __global__ void multibyte_split_kernel(

for (int32_t i = 0; i < ITEMS_PER_THREAD and i < thread_input_size; i++) {
if (trie.is_match(thread_states[i]) && !thread_offset.is_past_end()) {
auto const match_end = base_input_offset + thread_input_offset + i + 1;
bool const past_range = match_end >= byte_range_end;
auto const match_end = base_input_offset + thread_input_offset + i + 1;
bool const is_past_range = match_end >= byte_range_end;
output_offsets[thread_offset.offset() - base_offset_offset] = match_end;
singlepass_offset local_offset{};
local_offset.init(true, past_range);
thread_offset = thread_offset + local_offset;
thread_offset = thread_offset + cutoff_offset{true, is_past_range};
}
}
}
Expand Down Expand Up @@ -308,44 +309,44 @@ std::vector<rmm::cuda_stream_view> get_streams(int32_t count, rmm::cuda_stream_p
}

template <typename T>
class output_chunks {
class output_builder {
public:
using size_type = typename rmm::device_uvector<T>::size_type;

output_chunks(size_type max_output_size,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: _size{0}, _max_output_size{max_output_size}
output_builder(size_type max_write_size,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: _size{0}, _max_write_size{max_write_size}
{
assert(max_output_size > 0);
assert(max_write_size > 0);
_chunks.emplace_back(0, stream, mr);
_chunks.back().reserve(max_output_size * 2, stream);
_chunks.back().reserve(max_write_size * 2, stream);
}

output_chunks(output_chunks&&) = delete;
output_chunks(const output_chunks&) = delete;
output_chunks& operator=(output_chunks&&) = delete;
output_chunks& operator=(const output_chunks&) = delete;
output_builder(output_builder&&) = delete;
output_builder(const output_builder&) = delete;
output_builder& operator=(output_builder&&) = delete;
output_builder& operator=(const output_builder&) = delete;

[[nodiscard]] split_device_span<T> next_output(rmm::cuda_stream_view stream)
{
auto head_it = _chunks.end() - (_chunks.size() > 1 && _chunks.back().is_empty() ? 2 : 1);
auto head_span = free_span(*head_it);
if (head_span.size() >= _max_output_size) { return split_device_span<T>{head_span}; }
auto head_span = get_free_span(*head_it);
if (head_span.size() >= _max_write_size) { return split_device_span<T>{head_span}; }
if (head_it == _chunks.end() - 1) {
// insert a new vector of double size
auto const next_chunk_size = 2 * _chunks.back().capacity();
_chunks.emplace_back(0, stream, _chunks.back().memory_resource());
_chunks.back().reserve(next_chunk_size, stream);
}
auto tail_span = free_span(_chunks.back());
assert(head_span.size() + tail_span.size() >= _max_output_size);
auto tail_span = get_free_span(_chunks.back());
assert(head_span.size() + tail_span.size() >= _max_write_size);
return split_device_span<T>{head_span, tail_span};
}

void advance_output(size_type actual_size)
{
assert(actual_size <= _max_output_size);
assert(actual_size <= _max_write_size);
// this dummy stream is used for resizing, since we know we won't reallocate,
// providing a the correct stream is not necessary.
rmm::cuda_stream_view dummy_stream{};
Expand All @@ -365,25 +366,22 @@ class output_chunks {
_size += actual_size;
}

[[nodiscard]] const rmm::device_uvector<T>& first_chunk() const { return _chunks.front(); }

[[nodiscard]] const rmm::device_uvector<T>& last_nonempty_chunk() const
[[nodiscard]] T front_element(rmm::cuda_stream_view stream) const
{
return _chunks.size() > 1 && _chunks.back().is_empty() ? *(_chunks.end() - 2) : _chunks.back();
return _chunks.front().front_element(stream);
}

[[nodiscard]] size_type size() const
[[nodiscard]] T back_element(rmm::cuda_stream_view stream) const
{
assert(_size ==
std::accumulate(
_chunks.begin(), _chunks.end(), size_type{}, [](size_type acc, const auto& chunk) {
return acc + chunk.size();
}));
return _size;
auto const& last_nonempty_chunk =
_chunks.size() > 1 && _chunks.back().is_empty() ? *(_chunks.end() - 2) : _chunks.back();
return last_nonempty_chunk.back_element(stream);
}

rmm::device_uvector<T> collect(rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const
[[nodiscard]] size_type size() const { return _size; }

rmm::device_uvector<T> gather(rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const
{
rmm::device_uvector<T> output{size(), stream, mr};
auto output_it = output.begin();
Expand All @@ -395,13 +393,13 @@ class output_chunks {
}

private:
static device_span<T> free_span(rmm::device_uvector<T>& vector)
static device_span<T> get_free_span(rmm::device_uvector<T>& vector)
{
return device_span<T>{vector.data() + vector.size(), vector.capacity() - vector.size()};
}

size_type _size;
size_type _max_output_size;
size_type _max_write_size;
std::vector<rmm::device_uvector<T>> _chunks;
};

Expand Down Expand Up @@ -432,7 +430,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
// best when at least 32 more than max possible concurrent tiles, due to rolling `invalid`s
auto num_tile_states = std::max(32, TILES_PER_CHUNK * concurrency + 32);
auto tile_multistates = scan_tile_state<multistate>(num_tile_states, stream);
auto tile_offsets = scan_tile_state<singlepass_offset>(num_tile_states, stream);
auto tile_offsets = scan_tile_state<cutoff_offset>(num_tile_states, stream);

multibyte_split_init_kernel<<<TILES_PER_CHUNK,
THREADS_PER_TILE,
Expand All @@ -454,14 +452,14 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
tile_multistates,
tile_offsets,
multistate_seed,
{0});
{});

auto reader = source.create_reader();
auto chunk_offset = std::max<int64_t>(0, byte_range.offset() - delimiter.size());
auto const byte_range_end = byte_range.offset() + byte_range.size();
reader->skip_bytes(chunk_offset);
output_chunks<int64_t> offset_storage(ITEMS_PER_CHUNK / delimiter.size() + 1, stream, mr);
output_chunks<char> char_storage(ITEMS_PER_CHUNK, stream, mr);
output_builder<int64_t> offset_storage(ITEMS_PER_CHUNK / delimiter.size() + 1, stream, mr);
output_builder<char> char_storage(ITEMS_PER_CHUNK, stream, mr);

fork_stream(streams, stream);

Expand All @@ -484,8 +482,6 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
auto tiles_in_launch =
cudf::util::div_rounding_up_safe(chunk->size(), static_cast<std::size_t>(ITEMS_PER_TILE));

cudaStreamWaitEvent(scan_stream.value(), last_launch_event);

auto offset_output = offset_storage.next_output(scan_stream);

// reset the next chunk of tile state
Expand All @@ -498,6 +494,8 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
tile_multistates,
tile_offsets);

cudaStreamWaitEvent(scan_stream.value(), last_launch_event);

multibyte_split_kernel<<<tiles_in_launch,
THREADS_PER_TILE,
0,
Expand All @@ -520,11 +518,9 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
offset_storage.advance_output(next_tile_offset.offset() - offset_storage.size());
// determine if we found the first or last field offset for the byte range
if (next_tile_offset.offset() > 0 && !first_offset) {
first_offset = offset_storage.first_chunk().front_element(scan_stream);
}
if (next_tile_offset.is_past_end()) {
last_offset = offset_storage.last_nonempty_chunk().back_element(scan_stream);
first_offset = offset_storage.front_element(scan_stream);
}
if (next_tile_offset.is_past_end()) { last_offset = offset_storage.back_element(scan_stream); }
// copy over the characters we need, if we already encountered the first field delimiter
if (first_offset) {
auto const begin = chunk->data() + std::max<int64_t>(0, *first_offset - chunk_offset);
Expand Down Expand Up @@ -556,8 +552,8 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
return make_empty_column(type_id::STRING);
}

auto chars = char_storage.collect(stream, mr);
auto global_offsets = offset_storage.collect(stream, mr);
auto chars = char_storage.gather(stream, mr);
auto global_offsets = offset_storage.gather(stream, mr);

bool const insert_begin = *first_offset == 0;
bool const insert_end = !last_offset.has_value() || last_offset == chunk_offset;
Expand Down

0 comments on commit 918e45e

Please sign in to comment.