Skip to content

Commit

Permalink
apacheGH-37567: [C++] Migrate JSON Integration code to Result<> (apac…
Browse files Browse the repository at this point in the history
…he#37573)

### Rationale for this change

The JSON Integration APIs still use the old-style of returning `Status` together with a `T*` out-parameter.
While those are internal APIs, it would still improve ease of us to migrate them to the newer idiom of returning a `Result<T>`.

### What changes are included in this PR?

Migrate the relevant APIs from `Status` to `Result`. Since the APIs are internal, no deprecation period is introduced.

Also, some very minor style and testing cleanups here and there.

### Are these changes tested?

By existing tests.

### Are there any user-facing changes?

No.

* Closes: apache#37567

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
pitrou authored and loicalleyne committed Nov 13, 2023
1 parent e1e951c commit 61a4538
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ namespace integration_tests {
/// \brief Helper to read all batches from a JsonReader
Status ReadBatches(std::unique_ptr<testing::IntegrationJsonReader>& reader,
std::vector<std::shared_ptr<RecordBatch>>* chunks) {
std::shared_ptr<RecordBatch> chunk;
for (int i = 0; i < reader->num_record_batches(); i++) {
RETURN_NOT_OK(reader->ReadRecordBatch(i, &chunk));
ARROW_ASSIGN_OR_RAISE(auto chunk, reader->ReadRecordBatch(i));
RETURN_NOT_OK(chunk->ValidateFull());
chunks->push_back(chunk);
chunks->push_back(std::move(chunk));
}
return Status::OK();
}
Expand Down Expand Up @@ -150,11 +149,10 @@ class IntegrationTestScenario : public Scenario {
FlightDescriptor descr{FlightDescriptor::PATH, "", {FLAGS_path}};

// 1. Put the data to the server.
std::unique_ptr<testing::IntegrationJsonReader> reader;
std::cout << "Opening JSON file '" << FLAGS_path << "'" << std::endl;
auto in_file = *io::ReadableFile::Open(FLAGS_path);
ABORT_NOT_OK(
testing::IntegrationJsonReader::Open(default_memory_pool(), in_file, &reader));
ARROW_ASSIGN_OR_RAISE(auto reader, testing::IntegrationJsonReader::Open(
default_memory_pool(), in_file));

std::shared_ptr<Schema> original_schema = reader->schema();
std::vector<std::shared_ptr<RecordBatch>> original_data;
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/ipc/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class RecordBatchStreamReader;
class RecordBatchFileReader;
class RecordBatchWriter;

class DictionaryFieldMapper;
class DictionaryMemo;

namespace feather {

class Reader;
Expand Down
58 changes: 27 additions & 31 deletions cpp/src/arrow/testing/json_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>

using std::size_t;

namespace arrow {

using ipc::DictionaryFieldMapper;
Expand Down Expand Up @@ -82,12 +80,11 @@ class IntegrationJsonWriter::Impl {
return Status::OK();
}

Status Finish(std::string* result) {
Result<std::string> Finish() {
writer_->EndArray(); // Record batches
writer_->EndObject();

*result = string_buffer_.GetString();
return Status::OK();
return string_buffer_.GetString();
}

Status WriteRecordBatch(const RecordBatch& batch) {
Expand Down Expand Up @@ -115,15 +112,14 @@ IntegrationJsonWriter::IntegrationJsonWriter(const std::shared_ptr<Schema>& sche

IntegrationJsonWriter::~IntegrationJsonWriter() {}

Status IntegrationJsonWriter::Open(const std::shared_ptr<Schema>& schema,
std::unique_ptr<IntegrationJsonWriter>* writer) {
*writer = std::unique_ptr<IntegrationJsonWriter>(new IntegrationJsonWriter(schema));
return (*writer)->impl_->Start();
Result<std::unique_ptr<IntegrationJsonWriter>> IntegrationJsonWriter::Open(
const std::shared_ptr<Schema>& schema) {
auto writer = std::unique_ptr<IntegrationJsonWriter>(new IntegrationJsonWriter(schema));
RETURN_NOT_OK(writer->impl_->Start());
return writer;
}

Status IntegrationJsonWriter::Finish(std::string* result) {
return impl_->Finish(result);
}
Result<std::string> IntegrationJsonWriter::Finish() { return impl_->Finish(); }

Status IntegrationJsonWriter::WriteRecordBatch(const RecordBatch& batch) {
return impl_->WriteRecordBatch(batch);
Expand All @@ -144,7 +140,7 @@ class IntegrationJsonReader::Impl {
return Status::IOError("JSON parsing failed");
}

RETURN_NOT_OK(json::ReadSchema(doc_, pool_, &dictionary_memo_, &schema_));
ARROW_ASSIGN_OR_RAISE(schema_, json::ReadSchema(doc_, pool_, &dictionary_memo_));

auto it = std::as_const(doc_).FindMember("batches");
RETURN_NOT_ARRAY("batches", it, doc_);
Expand All @@ -153,13 +149,13 @@ class IntegrationJsonReader::Impl {
return Status::OK();
}

Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) {
DCHECK_GE(i, 0) << "i out of bounds";
DCHECK_LT(i, static_cast<int>(record_batches_->GetArray().Size()))
<< "i out of bounds";

return json::ReadRecordBatch(record_batches_->GetArray()[i], schema_,
&dictionary_memo_, pool_, batch);
&dictionary_memo_, pool_);
}

std::shared_ptr<Schema> schema() const { return schema_; }
Expand All @@ -179,29 +175,30 @@ class IntegrationJsonReader::Impl {
};

IntegrationJsonReader::IntegrationJsonReader(MemoryPool* pool,
const std::shared_ptr<Buffer>& data) {
impl_.reset(new Impl(pool, data));
std::shared_ptr<Buffer> data) {
impl_.reset(new Impl(pool, std::move(data)));
}

IntegrationJsonReader::~IntegrationJsonReader() {}

Status IntegrationJsonReader::Open(const std::shared_ptr<Buffer>& data,
std::unique_ptr<IntegrationJsonReader>* reader) {
return Open(default_memory_pool(), data, reader);
Result<std::unique_ptr<IntegrationJsonReader>> IntegrationJsonReader::Open(
MemoryPool* pool, std::shared_ptr<Buffer> data) {
auto reader = std::unique_ptr<IntegrationJsonReader>(
new IntegrationJsonReader(pool, std::move(data)));
RETURN_NOT_OK(reader->impl_->ParseAndReadSchema());
return reader;
}

Status IntegrationJsonReader::Open(MemoryPool* pool, const std::shared_ptr<Buffer>& data,
std::unique_ptr<IntegrationJsonReader>* reader) {
*reader = std::unique_ptr<IntegrationJsonReader>(new IntegrationJsonReader(pool, data));
return (*reader)->impl_->ParseAndReadSchema();
Result<std::unique_ptr<IntegrationJsonReader>> IntegrationJsonReader::Open(
std::shared_ptr<Buffer> data) {
return Open(default_memory_pool(), std::move(data));
}

Status IntegrationJsonReader::Open(MemoryPool* pool,
const std::shared_ptr<io::ReadableFile>& in_file,
std::unique_ptr<IntegrationJsonReader>* reader) {
Result<std::unique_ptr<IntegrationJsonReader>> IntegrationJsonReader::Open(
MemoryPool* pool, const std::shared_ptr<io::ReadableFile>& in_file) {
ARROW_ASSIGN_OR_RAISE(int64_t file_size, in_file->GetSize());
ARROW_ASSIGN_OR_RAISE(auto json_buffer, in_file->Read(file_size));
return Open(pool, json_buffer, reader);
return Open(pool, std::move(json_buffer));
}

std::shared_ptr<Schema> IntegrationJsonReader::schema() const { return impl_->schema(); }
Expand All @@ -210,9 +207,8 @@ int IntegrationJsonReader::num_record_batches() const {
return impl_->num_record_batches();
}

Status IntegrationJsonReader::ReadRecordBatch(int i,
std::shared_ptr<RecordBatch>* batch) const {
return impl_->ReadRecordBatch(i, batch);
Result<std::shared_ptr<RecordBatch>> IntegrationJsonReader::ReadRecordBatch(int i) const {
return impl_->ReadRecordBatch(i);
}

} // namespace testing
Expand Down
59 changes: 22 additions & 37 deletions cpp/src/arrow/testing/json_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,12 @@
#include <memory>
#include <string>

#include "arrow/status.h"
#include "arrow/io/type_fwd.h"
#include "arrow/result.h"
#include "arrow/testing/visibility.h"
#include "arrow/type_fwd.h"

namespace arrow {

class Buffer;
class MemoryPool;
class RecordBatch;
class Schema;

namespace io {
class ReadableFile;
} // namespace io

namespace testing {
namespace arrow::testing {

/// \class IntegrationJsonWriter
/// \brief Write the JSON representation of an Arrow record batch file or stream
Expand All @@ -49,19 +40,17 @@ class ARROW_TESTING_EXPORT IntegrationJsonWriter {
/// \brief Create a new JSON writer that writes to memory
///
/// \param[in] schema the schema of record batches
/// \param[out] out the returned writer object
/// \return Status
static Status Open(const std::shared_ptr<Schema>& schema,
std::unique_ptr<IntegrationJsonWriter>* out);
/// \return the creater writer object
static Result<std::unique_ptr<IntegrationJsonWriter>> Open(
const std::shared_ptr<Schema>& schema);

/// \brief Append a record batch
Status WriteRecordBatch(const RecordBatch& batch);

/// \brief Finish the JSON payload and return as a std::string
///
/// \param[out] result the JSON as as a std::string
/// \return Status
Status Finish(std::string* result);
/// \return the JSON payload as a string
Result<std::string> Finish();

private:
explicit IntegrationJsonWriter(const std::shared_ptr<Schema>& schema);
Expand All @@ -83,27 +72,24 @@ class ARROW_TESTING_EXPORT IntegrationJsonReader {
///
/// \param[in] pool a MemoryPool to use for buffer allocations
/// \param[in] data a Buffer containing the JSON data
/// \param[out] reader the returned reader object
/// \return Status
static Status Open(MemoryPool* pool, const std::shared_ptr<Buffer>& data,
std::unique_ptr<IntegrationJsonReader>* reader);
/// \return the created JSON reader
static Result<std::unique_ptr<IntegrationJsonReader>> Open(
MemoryPool* pool, std::shared_ptr<Buffer> data);

/// \brief Create a new JSON reader that uses the default memory pool
///
/// \param[in] data a Buffer containing the JSON data
/// \param[out] reader the returned reader object
/// \return Status
static Status Open(const std::shared_ptr<Buffer>& data,
std::unique_ptr<IntegrationJsonReader>* reader);
/// \return the created JSON reader
static Result<std::unique_ptr<IntegrationJsonReader>> Open(
std::shared_ptr<Buffer> data);

/// \brief Create a new JSON reader from a file
///
/// \param[in] pool a MemoryPool to use for buffer allocations
/// \param[in] in_file a ReadableFile containing JSON data
/// \param[out] reader the returned reader object
/// \return Status
static Status Open(MemoryPool* pool, const std::shared_ptr<io::ReadableFile>& in_file,
std::unique_ptr<IntegrationJsonReader>* reader);
/// \return the created JSON reader
static Result<std::unique_ptr<IntegrationJsonReader>> Open(
MemoryPool* pool, const std::shared_ptr<io::ReadableFile>& in_file);

/// \brief Return the schema read from the JSON
std::shared_ptr<Schema> schema() const;
Expand All @@ -114,16 +100,15 @@ class ARROW_TESTING_EXPORT IntegrationJsonReader {
/// \brief Read a particular record batch from the file
///
/// \param[in] i the record batch index, does not boundscheck
/// \param[out] batch the read record batch
Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const;
/// \return the record batch read
Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) const;

private:
IntegrationJsonReader(MemoryPool* pool, const std::shared_ptr<Buffer>& data);
IntegrationJsonReader(MemoryPool* pool, std::shared_ptr<Buffer> data);

// Hide RapidJSON details from public API
class Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace testing
} // namespace arrow
} // namespace arrow::testing

0 comments on commit 61a4538

Please sign in to comment.