Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Integrated changes due to rebasing on master
Browse files Browse the repository at this point in the history
The new `IndexHadoopFsRelation` wrapper was not used after rebasing
Added wrapper functions to maintain the closure context
  • Loading branch information
andrei-ionescu committed Jan 29, 2021
1 parent 7cd532f commit 37100fa
Showing 1 changed file with 43 additions and 35 deletions.
78 changes: 43 additions & 35 deletions src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ object RuleUtils {
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {

def makeHadoopFsRelation(index: IndexLogEntry, relation: LeafNode) = {
def makeHadoopFsRelation(index: IndexLogEntry, relation: LogicalPlan) = {
val location = index.withCachedTag(IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY) {
new InMemoryFileIndex(spark, index.content.files, Map(), None)
}
Expand Down Expand Up @@ -387,28 +387,25 @@ object RuleUtils {
}
}

def getIndexHadoopFsRelation(indexLogEntry: IndexLogEntry, plan: LogicalPlan,
filesDeleted: Set[FileInfo], filesToRead: Seq[Path]) = {
def getIndexHadoopFsRelation(plan: LogicalPlan, filesToRead: Seq[Path],
filesForDelete: Set[FileInfo],
wrapLocation: (Seq[Path], InMemoryFileIndex) => InMemoryFileIndex) = {
// In order to handle deleted files, read index data with the lineage column so that
// we could inject Filter-Not-In conditions on the lineage column to exclude the indexed
// rows from the deleted files.
val newSchema = StructType(
indexLogEntry.schema.filter(s =>
plan.schema.contains(s) || (filesDeleted.nonEmpty && s.name.equals(
index.schema.filter(s =>
plan.schema.contains(s) || (filesForDelete.nonEmpty && s.name.equals(
IndexConstants.DATA_FILE_NAME_ID))))

def fileIndex: InMemoryFileIndex =
new InMemoryFileIndex(spark, filesToRead, Map(), None)
val newLocation = if (filesToRead.length == index.content.files.size) {
index.withCachedTag(IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY)(fileIndex)
} else {
index.withCachedTag(plan, IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN)(fileIndex)
}

new IndexHadoopFsRelation(
newLocation,
wrapLocation(filesToRead, fileIndex),
new StructType(),
newSchema,
if (useBucketSpec) Some(indexLogEntry.bucketSpec) else None,
if (useBucketSpec) Some(index.bucketSpec) else None,
new ParquetFileFormat,
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index)
}
Expand All @@ -429,23 +426,20 @@ object RuleUtils {
}
}

val wrapIndex = (files: Seq[Path], fi: InMemoryFileIndex) => {
if (files.length == index.content.files.size) {
index.withCachedTag(IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY)(fi)
} else {
index.withCachedTag(plan, IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN)(fi)
}
}

// Get transformed plan with index data and appended files if applicable.
val indexPlan = plan transformUp {
// Use transformUp here as currently one Logical Relation is allowed (pre-requisite).
// The transformed plan will have LogicalRelation as a child; for example, LogicalRelation
// can be transformed to 'Project -> Filter -> Logical Relation'. Thus, with transformDown,
// it will be matched again and transformed recursively which causes stack overflow exception.

case v2Relation @ DataSourceV2Relation(_, baseOutput, _, _, userSchema) =>
val (filesDeleted, filesAppended) = getAppendedAndDeletedFiles(v2Relation)
val (filesToRead, unhandledFiles) = getFilesToReadAndUnhandled(filesDeleted, filesAppended,
userSchema.getOrElse(StructType(Nil)))
unhandledAppendedFiles = unhandledFiles
val relation = getIndexHadoopFsRelation(index, v2Relation, filesDeleted, filesToRead)

injectFilterNotForDeletedFiles(filesDeleted, relation, baseOutput,
(r, attrs) => new LogicalRelation(r, attrs, None, false))

case baseRelation @ LogicalRelation(
_ @HadoopFsRelation(location: FileIndex, _, _, _, _, _),
baseOutput,
Expand All @@ -455,10 +449,20 @@ object RuleUtils {
val (filesToRead, unhandledFiles) =
getFilesToReadAndUnhandled(filesDeleted, filesAppended, location.partitionSchema)
unhandledAppendedFiles = unhandledFiles
val relation = getIndexHadoopFsRelation(index, baseRelation, filesDeleted, filesToRead)
val relation = getIndexHadoopFsRelation(baseRelation, filesToRead, filesDeleted, wrapIndex)

injectFilterNotForDeletedFiles(filesDeleted, relation, baseOutput,
(r, attrs) => baseRelation.copy(relation = r, output = attrs))

case v2Relation @ DataSourceV2Relation(_, baseOutput, _, _, userSchema) =>
val (filesDeleted, filesAppended) = getAppendedAndDeletedFiles(v2Relation)
val (filesToRead, unhandledFiles) = getFilesToReadAndUnhandled(filesDeleted, filesAppended,
userSchema.getOrElse(StructType(Nil)))
unhandledAppendedFiles = unhandledFiles
val relation = getIndexHadoopFsRelation(v2Relation, filesToRead, filesDeleted, wrapIndex)

injectFilterNotForDeletedFiles(filesDeleted, relation, baseOutput,
(r, attrs) => new LogicalRelation(r, attrs, None, false))
}

if (unhandledAppendedFiles.nonEmpty) {
Expand Down Expand Up @@ -513,7 +517,8 @@ object RuleUtils {
originalPlan: LogicalPlan,
filesAppended: Seq[Path]): LogicalPlan = {

def getFileIndexLocation(plan: LogicalPlan, files: Seq[Path]) = {
def getFileIndexLocation(plan: LogicalPlan, files: Seq[Path],
wrapLocation: InMemoryFileIndex => InMemoryFileIndex) = {
// Set "basePath" so that partitioned columns are also included in the output schema.
val opts = plan match {
case _ @ LogicalRelation(
Expand All @@ -528,11 +533,12 @@ object RuleUtils {
}
val options = opts.flatMap(basePath =>
basePath.map(o => Map("basePath" -> o))).getOrElse(Map())
index.withCachedTag(
originalPlan,
IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED) {
new InMemoryFileIndex(spark, files, options, None)
}
wrapLocation(new InMemoryFileIndex(spark, files, options, None))
}

val wrapIndex = (fi: InMemoryFileIndex) => {
index.withCachedTag(originalPlan,
IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED)(fi)
}

// Transform the location of LogicalRelation with appended files.
Expand All @@ -542,7 +548,8 @@ object RuleUtils {
baseOutput,
_,
_) =>
val newLocation = getFileIndexLocation(baseRelation, filesAppended)
val newLocation = getFileIndexLocation(baseRelation, filesAppended, wrapIndex)

// Set the same output schema with the index plan to merge them using BucketUnion.
// Include partition columns for data loading.
val partitionColumns = location.partitionSchema.map(_.name)
Expand All @@ -558,13 +565,13 @@ object RuleUtils {
baseRelation.copy(relation = newRelation, output = updatedOutput)

case v2Relation @ DataSourceV2Relation(_, output, options, _, userSpecifiedSchema) =>
val newLocation = getFileIndexLocation(v2Relation, filesAppended)
val newLocation = getFileIndexLocation(v2Relation, filesAppended, wrapIndex)
val partitionSchema = userSpecifiedSchema.getOrElse(StructType(Nil))
val partitionColumns = partitionSchema.map(_.name)
val updatedSchema = StructType(v2Relation.schema.filter(col =>
indexSchema.contains(col) || partitionSchema.contains(col)))
index.schema.contains(col) || partitionSchema.contains(col)))
val updatedOutput = output.filter(attr =>
indexSchema.fieldNames.contains(attr.name) || partitionColumns.contains(attr.name))
index.schema.fieldNames.contains(attr.name) || partitionColumns.contains(attr.name))
new LogicalRelation(
HadoopFsRelation(
newLocation,
Expand Down Expand Up @@ -608,7 +615,8 @@ object RuleUtils {
Some(r, getIndexedAttrs(r, bucketSpec.bucketColumnNames), false)
}

private def getIndexedAttrs(plan: LogicalPlan,
private def getIndexedAttrs(
plan: LogicalPlan,
indexedColumns: Seq[String]): Seq[Option[Attribute]] = {
val attrMap = plan.output.attrs.map(attr => (attr.name, attr)).toMap
indexedColumns.map(colName => attrMap.get(colName))
Expand Down

0 comments on commit 37100fa

Please sign in to comment.