Skip to content

Commit

Permalink
apacheGH-26153: [C++] Share common codes for RecordBatchStreamReader …
Browse files Browse the repository at this point in the history
…and StreamDecoder

Because they (pull-based and push-based) must have the same behavior.

This PR extracts reusable codes to StreamDecoderInternal from
StreamDecoderImpl. External API isn't changed for
RecordBatchStreamReader and StreamDecoder.

This PR adds some external API to implement this:

* arrow::Status::ToStringWithoutContextLines(): This is only for
  testing. We can get stable result of ASSERT_RAISES_WITH_MESSAGE()
  with/without -DARROW_EXTRA_ERROR_CONTEXT=ON by this.

  We can extract this and related changes to separated PR if we want.

* arrow::ipc::Listener::OnRecordBatchWithMetadataDecoded(): Because
  RecordBatchStreamReader wants not only RecordBatch but also custom
  metadata. OnRecordBatchWithMetadataDecoded() receives
  RecordBatchWithMetadata. OnRecordBatchDecoded() still exists and
  it's used by default for backward compatibility.

* arrow::ipc::CollectListener::metadatas(),
  arrow::ipc::CollectListener::num_record_batches(),
  arrow::ipc::CollectListener::PopRecordBatch(),
  arrow::ipc::CollectListener::PopRecordBatchWithMetadat(): If we add
  these APIs, we can use CollectListner in RecordBatchStreamReader. We
  can create an internal listener only for RecordBatchStreamReader if
  don't want to extend CollectListener.
  • Loading branch information
kou committed Jul 3, 2023
1 parent 0b7bd74 commit 3434a8b
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 246 deletions.
25 changes: 12 additions & 13 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2082,29 +2082,28 @@ TEST(TestRecordBatchStreamReader, NotEnoughDictionaries) {
// error
ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());

auto AssertFailsWith = [](std::shared_ptr<Buffer> stream, const std::string& ex_error) {
auto Read = [](std::shared_ptr<Buffer> stream) -> Status {
io::BufferReader reader(stream);
ASSERT_OK_AND_ASSIGN(auto ipc_reader, RecordBatchStreamReader::Open(&reader));
ARROW_ASSIGN_OR_RAISE(auto ipc_reader, RecordBatchStreamReader::Open(&reader));
std::shared_ptr<RecordBatch> batch;
Status s = ipc_reader->ReadNext(&batch);
ASSERT_TRUE(s.IsInvalid());
ASSERT_EQ(ex_error, s.message().substr(0, ex_error.size()));
return ipc_reader->ReadNext(&batch);
};

// Stream terminates before reading all dictionaries
std::shared_ptr<Buffer> truncated_stream;
SpliceMessages(buffer, {0, 1}, &truncated_stream);
std::string ex_message =
("IPC stream ended without reading the expected number (3)"
" of dictionaries");
AssertFailsWith(truncated_stream, ex_message);
ASSERT_RAISES_WITH_MESSAGE(Invalid,
"Invalid: IPC stream ended without "
"reading the expected number (3) of dictionaries",
Read(truncated_stream));

// One of the dictionaries is missing, then we see a record batch
SpliceMessages(buffer, {0, 1, 2, 4}, &truncated_stream);
ex_message =
("IPC stream did not have the expected number (3) of dictionaries "
"at the start of the stream");
AssertFailsWith(truncated_stream, ex_message);
ASSERT_RAISES_WITH_MESSAGE(Invalid,
"Invalid: IPC stream did not have "
"the expected number (3) of dictionaries "
"at the start of the stream",
Read(truncated_stream));
}

TEST(TestRecordBatchStreamReader, MalformedInput) {
Expand Down

0 comments on commit 3434a8b

Please sign in to comment.