Skip to content

Commit

Permalink
apacheGH-34888: [C++][Parquet] Writer supports adding extra kv meta
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Apr 4, 2023
1 parent 6024678 commit 3e98473
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 36 deletions.
27 changes: 21 additions & 6 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@

#include "parquet/file_writer.h"

#include <cstddef>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>

#include "arrow/util/key_value_metadata.h"
#include "parquet/column_writer.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_encryptor.h"
#include "parquet/exception.h"
#include "parquet/platform.h"
#include "parquet/schema.h"
#include "parquet/types.h"

using arrow::MemoryPool;

Expand Down Expand Up @@ -321,7 +320,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
auto file_encryption_properties = properties_->file_encryption_properties();

if (file_encryption_properties == nullptr) { // Non encrypted file.
file_metadata_ = metadata_->Finish();
file_metadata_ = metadata_->Finish(key_value_metadata_);
WriteFileMetaData(*file_metadata_, sink_.get());
} else { // Encrypted file
CloseEncryptedFile(file_encryption_properties);
Expand Down Expand Up @@ -356,6 +355,15 @@ class FileSerializer : public ParquetFileWriter::Contents {

RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); }

void AddKeyValueMetadata(
std::shared_ptr<const KeyValueMetadata> key_value_metadata) override {
if (key_value_metadata_ == nullptr) {
key_value_metadata_ = std::move(key_value_metadata);
} else if (key_value_metadata != nullptr) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
}

~FileSerializer() override {
try {
Close();
Expand All @@ -374,7 +382,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
properties_(std::move(properties)),
num_row_groups_(0),
num_rows_(0),
metadata_(FileMetaDataBuilder::Make(&schema_, properties_, key_value_metadata_)) {
metadata_(FileMetaDataBuilder::Make(&schema_, properties_)) {
PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell());
if (position == 0) {
StartFile();
Expand All @@ -387,7 +395,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
// Encrypted file with encrypted footer
if (file_encryption_properties->encrypted_footer()) {
// encrypted footer
file_metadata_ = metadata_->Finish();
file_metadata_ = metadata_->Finish(key_value_metadata_);

PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell());
uint64_t metadata_start = static_cast<uint64_t>(position);
Expand All @@ -402,7 +410,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
sink_->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
PARQUET_THROW_NOT_OK(sink_->Write(kParquetEMagic, 4));
} else { // Encrypted file with plaintext footer
file_metadata_ = metadata_->Finish();
file_metadata_ = metadata_->Finish(key_value_metadata_);
auto footer_signing_encryptor = file_encryptor_->GetFooterSigningEncryptor();
WriteEncryptedFileMetadata(*file_metadata_, sink_.get(), footer_signing_encryptor,
false);
Expand Down Expand Up @@ -574,6 +582,13 @@ RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
return AppendRowGroup();
}

void ParquetFileWriter::AddKeyValueMetadata(
std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
if (contents_) {
contents_->AddKeyValueMetadata(std::move(key_value_metadata));
}
}

const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
return contents_->properties();
}
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ class PARQUET_EXPORT ParquetFileWriter {
return key_value_metadata_;
}

virtual void AddKeyValueMetadata(
std::shared_ptr<const KeyValueMetadata> key_value_metadata) = 0;

// Return const-pointer to make it clear that this object is not to be copied
const SchemaDescriptor* schema() const { return &schema_; }

Expand Down Expand Up @@ -209,6 +212,13 @@ class PARQUET_EXPORT ParquetFileWriter {
/// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
RowGroupWriter* AppendBufferedRowGroup();

/// \brief Add key-value metadata to the file.
/// \param[in] key_value_metadata the metadata to add.
/// \note This will overwrite any existing metadata with the same key.
/// It will not take effect if Close() has been called.
void AddKeyValueMetadata(
std::shared_ptr<const KeyValueMetadata> key_value_metadata);

/// Number of columns.
///
/// This number is fixed during the lifetime of the writer as it is determined via
Expand Down
43 changes: 21 additions & 22 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1743,13 +1743,11 @@ void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written,
// TODO(PARQUET-595) Support key_value_metadata
class FileMetaDataBuilder::FileMetaDataBuilderImpl {
public:
explicit FileMetaDataBuilderImpl(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
std::shared_ptr<const KeyValueMetadata> key_value_metadata)
explicit FileMetaDataBuilderImpl(const SchemaDescriptor* schema,
std::shared_ptr<WriterProperties> props)
: metadata_(new format::FileMetaData()),
properties_(std::move(props)),
schema_(schema),
key_value_metadata_(std::move(key_value_metadata)) {
schema_(schema) {
if (properties_->file_encryption_properties() != nullptr &&
properties_->file_encryption_properties()->encrypted_footer()) {
crypto_metadata_.reset(new format::FileCryptoMetaData());
Expand All @@ -1763,21 +1761,22 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
return current_row_group_builder_.get();
}

std::unique_ptr<FileMetaData> Finish() {
std::unique_ptr<FileMetaData> Finish(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
int64_t total_rows = 0;
for (auto row_group : row_groups_) {
total_rows += row_group.num_rows;
}
metadata_->__set_num_rows(total_rows);
metadata_->__set_row_groups(row_groups_);

if (key_value_metadata_) {
if (key_value_metadata) {
metadata_->key_value_metadata.clear();
metadata_->key_value_metadata.reserve(key_value_metadata_->size());
for (int64_t i = 0; i < key_value_metadata_->size(); ++i) {
metadata_->key_value_metadata.reserve(key_value_metadata->size());
for (int64_t i = 0; i < key_value_metadata->size(); ++i) {
format::KeyValue kv_pair;
kv_pair.__set_key(key_value_metadata_->key(i));
kv_pair.__set_value(key_value_metadata_->value(i));
kv_pair.__set_key(key_value_metadata->key(i));
kv_pair.__set_value(key_value_metadata->value(i));
metadata_->key_value_metadata.push_back(kv_pair);
}
metadata_->__isset.key_value_metadata = true;
Expand All @@ -1795,7 +1794,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
metadata_->__set_version(file_version);
metadata_->__set_created_by(properties_->created_by());

// Users cannot set the `ColumnOrder` since we donot not have user defined sort order
// Users cannot set the `ColumnOrder` since we do not have user defined sort order
// in the spec yet.
// We always default to `TYPE_DEFINED_ORDER`. We can expose it in
// the API once we have user defined sort orders in the Parquet format.
Expand Down Expand Up @@ -1866,29 +1865,29 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {

std::unique_ptr<RowGroupMetaDataBuilder> current_row_group_builder_;
const SchemaDescriptor* schema_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
};

std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props) {
return std::unique_ptr<FileMetaDataBuilder>(
new FileMetaDataBuilder(schema, std::move(props), std::move(key_value_metadata)));
new FileMetaDataBuilder(schema, std::move(props)));
}

FileMetaDataBuilder::FileMetaDataBuilder(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
std::shared_ptr<const KeyValueMetadata> key_value_metadata)
: impl_{std::unique_ptr<FileMetaDataBuilderImpl>(new FileMetaDataBuilderImpl(
schema, std::move(props), std::move(key_value_metadata)))} {}
FileMetaDataBuilder::FileMetaDataBuilder(const SchemaDescriptor* schema,
std::shared_ptr<WriterProperties> props)
: impl_{std::unique_ptr<FileMetaDataBuilderImpl>(
new FileMetaDataBuilderImpl(schema, std::move(props)))} {}

FileMetaDataBuilder::~FileMetaDataBuilder() = default;

RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() {
return impl_->AppendRowGroup();
}

std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return impl_->Finish(); }
std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
return impl_->Finish(key_value_metadata);
}

std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData() {
return impl_->BuildFileCryptoMetaData();
Expand Down
11 changes: 5 additions & 6 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,24 +510,23 @@ class PARQUET_EXPORT FileMetaDataBuilder {
public:
// API convenience to get a MetaData reader
static std::unique_ptr<FileMetaDataBuilder> Make(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
std::shared_ptr<const KeyValueMetadata> key_value_metadata = NULLPTR);
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props);

~FileMetaDataBuilder();

// The prior RowGroupMetaDataBuilder (if any) is destroyed
RowGroupMetaDataBuilder* AppendRowGroup();

// Complete the Thrift structure
std::unique_ptr<FileMetaData> Finish();
std::unique_ptr<FileMetaData> Finish(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = NULLPTR);

// crypto metadata
std::unique_ptr<FileCryptoMetaData> GetCryptoMetaData();

private:
explicit FileMetaDataBuilder(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
std::shared_ptr<const KeyValueMetadata> key_value_metadata = NULLPTR);
explicit FileMetaDataBuilder(const SchemaDescriptor* schema,
std::shared_ptr<WriterProperties> props);
// PIMPL Idiom
class FileMetaDataBuilderImpl;
std::unique_ptr<FileMetaDataBuilderImpl> impl_;
Expand Down
50 changes: 48 additions & 2 deletions cpp/src/parquet/metadata_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "arrow/util/key_value_metadata.h"
#include "parquet/file_reader.h"
#include "parquet/file_writer.h"
#include "parquet/schema.h"
#include "parquet/statistics.h"
#include "parquet/test_util.h"
Expand Down Expand Up @@ -284,16 +285,61 @@ TEST(Metadata, TestKeyValueMetadata) {
auto kvmeta = std::make_shared<KeyValueMetadata>();
kvmeta->Append("test_key", "test_value");

auto f_builder = FileMetaDataBuilder::Make(&schema, props, kvmeta);
auto f_builder = FileMetaDataBuilder::Make(&schema, props);

// Read the metadata
auto f_accessor = f_builder->Finish();
auto f_accessor = f_builder->Finish(kvmeta);

// Key value metadata
ASSERT_TRUE(f_accessor->key_value_metadata());
EXPECT_TRUE(f_accessor->key_value_metadata()->Equals(*kvmeta));
}

TEST(Metadata, TestAddKeyValueMetadata) {
schema::NodeVector fields;
fields.push_back(schema::Int32("int_col", Repetition::REQUIRED));
auto schema = std::static_pointer_cast<schema::GroupNode>(
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));

auto kv_meta = std::make_shared<KeyValueMetadata>();
kv_meta->Append("test_key_1", "test_value_1");
kv_meta->Append("test_key_2", "test_value_2_");

auto sink = CreateOutputStream();
auto writer_props = parquet::WriterProperties::Builder().disable_dictionary()->build();
auto file_writer =
parquet::ParquetFileWriter::Open(sink, schema, writer_props, kv_meta);

// Key value metadata that will be added to the file.
auto kv_meta_added = std::make_shared<KeyValueMetadata>();
kv_meta_added->Append("test_key_2", "test_value_2");
kv_meta_added->Append("test_key_3", "test_value_3");

file_writer->AddKeyValueMetadata(kv_meta_added);
file_writer->Close();

// Key value metadata that will be ignored since file writer is closed.
auto kv_meta_ignored = std::make_shared<KeyValueMetadata>();
kv_meta_ignored->Append("test_key_4", "test_value_4");

PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);

ASSERT_NE(nullptr, file_reader->metadata());
ASSERT_NE(nullptr, file_reader->metadata()->key_value_metadata());
auto read_kv_meta = file_reader->metadata()->key_value_metadata();

// Verify keys that were added before file writer was closed are present.
for (int i = 1; i <= 3; ++i) {
auto index = std::to_string(i);
PARQUET_ASSIGN_OR_THROW(auto value, read_kv_meta->Get("test_key_" + index));
EXPECT_EQ("test_value_" + index, value);
}
// Verify keys that were added after file writer was closed are not present.
EXPECT_FALSE(read_kv_meta->Contains("test_key_4"));
}

TEST(Metadata, TestHasBloomFilter) {
std::string dir_string(parquet::test::get_data_dir());
std::string path = dir_string + "/data_index_bloom_encoding_stats.parquet";
Expand Down

0 comments on commit 3e98473

Please sign in to comment.