Skip to content

Commit

Permalink
Support struct column reading with different schemas (facebookincubat…
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo authored and glutenperfbot committed May 15, 2024
1 parent ed70919 commit 543558c
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 33 deletions.
23 changes: 17 additions & 6 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,23 @@ std::vector<TypePtr> SplitReader::adaptColumns(
} else {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
// Column is missing. Most likely due to schema evolution.
VELOX_CHECK(tableSchema);
childSpec->setConstantValue(BaseVector::createNullConstant(
tableSchema->findChild(fieldName),
1,
connectorQueryCtx_->memoryPool()));
// If field name exists in the user-specified output type, set the
// column as null constant. Related PR:
// https://github.com/facebookincubator/velox/pull/6427.
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
if (outputTypeIdx.has_value()) {
childSpec->setConstantValue(BaseVector::createNullConstant(
readerOutputType_->childAt(outputTypeIdx.value()),
1,
connectorQueryCtx_->memoryPool()));
} else {
// Column is missing. Most likely due to schema evolution.
VELOX_CHECK(tableSchema);
childSpec->setConstantValue(BaseVector::createNullConstant(
tableSchema->findChild(fieldName),
1,
connectorQueryCtx_->memoryPool()));
}
} else {
// Column no longer missing, reset constant value set on the spec.
childSpec->setConstantValue(nullptr);
Expand Down
4 changes: 1 addition & 3 deletions velox/dwio/common/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ void SelectiveStructColumnReaderBase::read(
}

auto& childSpecs = scanSpec_->children();
VELOX_CHECK(!childSpecs.empty());
for (size_t i = 0; i < childSpecs.size(); ++i) {
auto& childSpec = childSpecs[i];
VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str());
Expand Down Expand Up @@ -243,7 +242,7 @@ bool SelectiveStructColumnReaderBase::isChildConstant(
fileType_->type()->kind() !=
TypeKind::MAP && // If this is the case it means this is a flat map,
// so it can't have "missing" fields.
childSpec.channel() >= fileType_->size());
!fileType_->containsChild(childSpec.fieldName()));
}

namespace {
Expand Down Expand Up @@ -327,7 +326,6 @@ void setNullField(
void SelectiveStructColumnReaderBase::getValues(
RowSet rows,
VectorPtr* result) {
VELOX_CHECK(!scanSpec_->children().empty());
VELOX_CHECK_NOT_NULL(
*result, "SelectiveStructColumnReaderBase expects a non-null result");
VELOX_CHECK(
Expand Down
39 changes: 39 additions & 0 deletions velox/dwio/common/TypeWithId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ std::vector<std::shared_ptr<const TypeWithId>> toShared(
}
return result;
}

TypePtr adjustNameAsLowerCase(const TypePtr& type) {
if (auto rowTypePtr = asRowType(type)) {
std::vector<std::string> names;
names.reserve(rowTypePtr->names().size());
std::vector<TypePtr> types = rowTypePtr->children();
for (const auto& name : rowTypePtr->names()) {
std::string childName = name;
folly::toLowerAscii(childName);
names.emplace_back(childName);
}
return TypeFactory<TypeKind::ROW>::create(
std::move(names), std::move(types));
}
return type;
}
} // namespace

TypeWithId::TypeWithId(
Expand All @@ -57,6 +73,29 @@ std::unique_ptr<TypeWithId> TypeWithId::create(
return create(root, next, 0);
}

std::unique_ptr<TypeWithId> TypeWithId::duplicate(bool nameAsLowerCase) const {
if (children_.empty()) {
std::vector<std::unique_ptr<TypeWithId>> children;
return std::make_unique<TypeWithId>(
nameAsLowerCase ? adjustNameAsLowerCase(type_) : type_,
std::move(children),
id_,
maxId_,
column_);
}
std::vector<std::unique_ptr<TypeWithId>> children;
children.reserve(children_.size());
for (const auto& child : children_) {
children.emplace_back(child->duplicate(nameAsLowerCase));
}
return std::make_unique<TypeWithId>(
nameAsLowerCase ? adjustNameAsLowerCase(type_) : type_,
std::move(children),
id_,
maxId_,
column_);
}

uint32_t TypeWithId::size() const {
return children_.size();
}
Expand Down
7 changes: 7 additions & 0 deletions velox/dwio/common/TypeWithId.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {
const std::shared_ptr<const velox::Type>& root,
uint32_t next = 0);

std::unique_ptr<TypeWithId> duplicate(bool nameAsLowerCase) const;

uint32_t size() const override;

const std::shared_ptr<const velox::Type>& type() const {
Expand All @@ -63,6 +65,11 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {

const std::shared_ptr<const TypeWithId>& childAt(uint32_t idx) const override;

bool containsChild(const std::string& name) const {
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
return type_->as<velox::TypeKind::ROW>().containsChild(name);
}

const std::shared_ptr<const TypeWithId>& childByName(
const std::string& name) const {
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
Expand Down
9 changes: 5 additions & 4 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec) {
common::ScanSpec& scanSpec,
memory::MemoryPool& pool) {
auto colName = scanSpec.fieldName();

switch (fileType->type()->kind()) {
Expand All @@ -56,19 +57,19 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(

case TypeKind::ROW:
return std::make_unique<StructColumnReader>(
requestedType, fileType, params, scanSpec);
requestedType, fileType, params, scanSpec, pool);

case TypeKind::VARBINARY:
case TypeKind::VARCHAR:
return std::make_unique<StringColumnReader>(fileType, params, scanSpec);

case TypeKind::ARRAY:
return std::make_unique<ListColumnReader>(
requestedType, fileType, params, scanSpec);
requestedType, fileType, params, scanSpec, pool);

case TypeKind::MAP:
return std::make_unique<MapColumnReader>(
requestedType, fileType, params, scanSpec);
requestedType, fileType, params, scanSpec, pool);

case TypeKind::BOOLEAN:
return std::make_unique<BooleanColumnReader>(
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/reader/ParquetColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ParquetColumnReader {
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
memory::MemoryPool& pool);
};
} // namespace facebook::velox::parquet
9 changes: 6 additions & 3 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -760,14 +760,17 @@ class ParquetRowReader::Impl {
}
ParquetParams params(
pool_, columnReaderStats_, readerBase_->fileMetaData());
auto columnSelector = std::make_shared<ColumnSelector>(
ColumnSelector::apply(options_.getSelector(), readerBase_->schema()));
auto columnSelector = options_.getSelector()
? options_.getSelector()
: std::make_shared<ColumnSelector>(ColumnSelector::apply(
options_.getSelector(), readerBase_->schema()));
requestedType_ = columnSelector->getSchemaWithId();
columnReader_ = ParquetColumnReader::build(
requestedType_,
readerBase_->schemaWithId(), // Id is schema id
params,
*options_.getScanSpec());
*options_.getScanSpec(),
pool_);

filterRowGroups();
if (!rowGroupIds_.empty()) {
Expand Down
23 changes: 18 additions & 5 deletions velox/dwio/parquet/reader/RepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ PageReader* readLeafRepDefs(
return nullptr;
}
auto pageReader = reader->formatData().as<ParquetData>().reader();
if (pageReader == nullptr) {
return nullptr;
}
pageReader->decodeRepDefs(numTop);
return pageReader;
}
Expand Down Expand Up @@ -113,7 +116,8 @@ MapColumnReader::MapColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
common::ScanSpec& scanSpec,
memory::MemoryPool& pool)
: dwio::common::SelectiveMapColumnReader(
requestedType,
fileType,
Expand All @@ -123,9 +127,17 @@ MapColumnReader::MapColumnReader(
auto& keyChildType = requestedType->childAt(0);
auto& elementChildType = requestedType->childAt(1);
keyReader_ = ParquetColumnReader::build(
keyChildType, fileType_->childAt(0), params, *scanSpec.children()[0]);
keyChildType,
fileType_->childAt(0),
params,
*scanSpec.children()[0],
pool);
elementReader_ = ParquetColumnReader::build(
elementChildType, fileType_->childAt(1), params, *scanSpec.children()[1]);
elementChildType,
fileType_->childAt(1),
params,
*scanSpec.children()[1],
pool);
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
->makeLevelInfo(levelInfo_);
children_ = {keyReader_.get(), elementReader_.get()};
Expand Down Expand Up @@ -223,15 +235,16 @@ ListColumnReader::ListColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
common::ScanSpec& scanSpec,
memory::MemoryPool& pool)
: dwio::common::SelectiveListColumnReader(
requestedType,
fileType,
params,
scanSpec) {
auto& childType = requestedType->childAt(0);
child_ = ParquetColumnReader::build(
childType, fileType_->childAt(0), params, *scanSpec.children()[0]);
childType, fileType_->childAt(0), params, *scanSpec.children()[0], pool);
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
->makeLevelInfo(levelInfo_);
children_ = {child_.get()};
Expand Down
6 changes: 4 additions & 2 deletions velox/dwio/parquet/reader/RepeatedColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
memory::MemoryPool& pool);

void prepareRead(
vector_size_t offset,
Expand Down Expand Up @@ -115,7 +116,8 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
memory::MemoryPool& pool);

void prepareRead(
vector_size_t offset,
Expand Down
42 changes: 34 additions & 8 deletions velox/dwio/parquet/reader/StructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,46 @@ StructColumnReader::StructColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
common::ScanSpec& scanSpec,
memory::MemoryPool& pool)
: SelectiveStructColumnReader(requestedType, fileType, params, scanSpec) {
auto& childSpecs = scanSpec_->stableChildren();
std::vector<int> missingFields;
for (auto i = 0; i < childSpecs.size(); ++i) {
auto childSpec = childSpecs[i];
if (childSpecs[i]->isConstant()) {
continue;
}
auto childFileType = fileType_->childByName(childSpec->fieldName());
auto childRequestedType =
requestedType_->childByName(childSpec->fieldName());
const auto& fieldName = childSpec->fieldName();
if (!fileType_->containsChild(fieldName)) {
missingFields.emplace_back(i);
continue;
}
auto childFileType = fileType_->childByName(fieldName);
auto childRequestedType = requestedType_->childByName(fieldName);
addChild(ParquetColumnReader::build(
childRequestedType, childFileType, params, *childSpec));

childRequestedType, childFileType, params, *childSpec, pool));
childSpecs[i]->setSubscript(children_.size() - 1);
}

if (missingFields.size() > 0) {
// Set the struct as null if all the children fields in the output type are
// missing and the number of child fields is more than one.
if (childSpecs.size() > 1 && missingFields.size() == childSpecs.size()) {
scanSpec_->setConstantValue(
BaseVector::createNullConstant(requestedType_->type(), 1, &pool));
} else {
// Set null constant for the missing child field of output type.
for (int channel : missingFields) {
childSpecs[channel]->setConstantValue(BaseVector::createNullConstant(
requestedType_->childByName(childSpecs[channel]->fieldName())
->type(),
1,
&pool));
}
}
}

auto type = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get());
if (type->parent()) {
levelMode_ = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get())
Expand All @@ -55,7 +79,10 @@ StructColumnReader::StructColumnReader(
// this and the child.
auto child = childForRepDefs_;
for (;;) {
assert(child);
if (child == nullptr) {
levelMode_ = LevelMode::kNulls;
break;
}
if (child->fileType().type()->kind() == TypeKind::ARRAY ||
child->fileType().type()->kind() == TypeKind::MAP) {
levelMode_ = LevelMode::kStructOverLists;
Expand Down Expand Up @@ -92,7 +119,6 @@ StructColumnReader::findBestLeaf() {
best = child;
}
}
assert(best);
return best;
}

Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/reader/StructColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader {
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
memory::MemoryPool& pool);

void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls)
override;
Expand Down
Binary file not shown.
Loading

0 comments on commit 543558c

Please sign in to comment.