Skip to content

Commit

Permalink
apacheGH-26153: [C++] Share common codes for RecordBatchStreamReader …
Browse files Browse the repository at this point in the history
…and StreamDecoder (apache#36344)

### Rationale for this change

Because they (pull-based and push-based) must have the same behavior.

### What changes are included in this PR?

This PR extracts reusable codes to StreamDecoderInternal from StreamDecoderImpl. External API isn't changed for
RecordBatchStreamReader and StreamDecoder.

This PR adds some external API to implement this:

* arrow::Status::ToStringWithoutContextLines(): This is only for testing. We can get stable result of ASSERT_RAISES_WITH_MESSAGE() with/without -DARROW_EXTRA_ERROR_CONTEXT=ON by this.

  We can extract this and related changes to separated PR if we want.

* arrow::ipc::Listener::OnRecordBatchWithMetadataDecoded(): Because RecordBatchStreamReader wants not only RecordBatch but also custom metadata. OnRecordBatchWithMetadataDecoded() receives RecordBatchWithMetadata. OnRecordBatchDecoded() still exists and it's used by default for backward compatibility.

* arrow::ipc::CollectListener::metadatas(), arrow::ipc::CollectListener::num_record_batches(), arrow::ipc::CollectListener::PopRecordBatch(), arrow::ipc::CollectListener::PopRecordBatchWithMetadat(): If we add these APIs, we can use CollectListner in RecordBatchStreamReader. We can create an internal listener only for RecordBatchStreamReader if don't want to extend CollectListener.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.

**This PR includes breaking changes to public APIs.**

`arrow::ipc::CollectListener::record_batches()` returns `const std::vector<std::shared_ptr<RecordBatch>>&` instead of `std::vector<std::shared_ptr<RecordBatch>>`.

* Closes: apache#26153

Lead-authored-by: Sutou Kouhei <kou@clear-code.com>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
3 people authored and westonpace committed Jul 7, 2023
1 parent e21543f commit 1facf30
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 247 deletions.
25 changes: 12 additions & 13 deletions cpp/src/arrow/ipc/read_write_test.cc
Expand Up @@ -2082,29 +2082,28 @@ TEST(TestRecordBatchStreamReader, NotEnoughDictionaries) {
// error
ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());

auto AssertFailsWith = [](std::shared_ptr<Buffer> stream, const std::string& ex_error) {
auto Read = [](std::shared_ptr<Buffer> stream) -> Status {
io::BufferReader reader(stream);
ASSERT_OK_AND_ASSIGN(auto ipc_reader, RecordBatchStreamReader::Open(&reader));
ARROW_ASSIGN_OR_RAISE(auto ipc_reader, RecordBatchStreamReader::Open(&reader));
std::shared_ptr<RecordBatch> batch;
Status s = ipc_reader->ReadNext(&batch);
ASSERT_TRUE(s.IsInvalid());
ASSERT_EQ(ex_error, s.message().substr(0, ex_error.size()));
return ipc_reader->ReadNext(&batch);
};

// Stream terminates before reading all dictionaries
std::shared_ptr<Buffer> truncated_stream;
SpliceMessages(buffer, {0, 1}, &truncated_stream);
std::string ex_message =
("IPC stream ended without reading the expected number (3)"
" of dictionaries");
AssertFailsWith(truncated_stream, ex_message);
ASSERT_RAISES_WITH_MESSAGE(Invalid,
"Invalid: IPC stream ended without "
"reading the expected number (3) of dictionaries",
Read(truncated_stream));

// One of the dictionaries is missing, then we see a record batch
SpliceMessages(buffer, {0, 1, 2, 4}, &truncated_stream);
ex_message =
("IPC stream did not have the expected number (3) of dictionaries "
"at the start of the stream");
AssertFailsWith(truncated_stream, ex_message);
ASSERT_RAISES_WITH_MESSAGE(Invalid,
"Invalid: IPC stream did not have "
"the expected number (3) of dictionaries "
"at the start of the stream",
Read(truncated_stream));
}

TEST(TestRecordBatchStreamReader, MalformedInput) {
Expand Down

0 comments on commit 1facf30

Please sign in to comment.