Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Support DataSourceV2 sources #321

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class CreateAction(

final override def validate(): Unit = {
// We currently only support createIndex() over HDFS file based scan nodes.
if (!LogicalPlanUtils.isLogicalRelation(df.queryExecution.optimizedPlan)) {
if (!LogicalPlanUtils.isSupportedRelation(df.queryExecution.optimizedPlan)) {
throw HyperspaceException(
"Only creating index over HDFS file based scan nodes is supported.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ package com.microsoft.hyperspace.actions

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils}
import com.microsoft.hyperspace.util.{HyperspaceConf, LogicalPlanUtils, PathUtils, ResolverUtils}

/**
* CreateActionBase provides functionality to write dataframe as covering index.
Expand Down Expand Up @@ -64,7 +67,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
signatureProvider.signature(df.queryExecution.optimizedPlan) match {
case Some(s) =>
val relations = sourceRelations(spark, df)
// Currently we only support to create an index on a LogicalRelation.
andrei-ionescu marked this conversation as resolved.
Show resolved Hide resolved
// Currently, we only support to create an index on only one relation.
assert(relations.size == 1)

val sourcePlanProperties = SparkPlan.Properties(
Expand Down Expand Up @@ -97,8 +100,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
private def hasParquetAsSourceFormatProperty(
spark: SparkSession,
df: DataFrame): Option[(String, String)] = {
val relation = df.queryExecution.optimizedPlan.asInstanceOf[LogicalRelation]
if (Hyperspace.getContext(spark).sourceProviderManager.hasParquetAsSourceFormat(relation)) {
if (Hyperspace.getContext(spark).sourceProviderManager
.hasParquetAsSourceFormat(df.queryExecution.optimizedPlan)) {
Some(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true")
} else {
None
Expand All @@ -115,7 +118,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)

protected def sourceRelations(spark: SparkSession, df: DataFrame): Seq[Relation] =
df.queryExecution.optimizedPlan.collect {
case p: LogicalRelation =>
case p: LogicalPlan if LogicalPlanUtils.isSupportedRelation(p) =>
Hyperspace.getContext(spark).sourceProviderManager.createRelation(p, fileIdTracker)
}

Expand Down Expand Up @@ -190,9 +193,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
// + file:/C:/hyperspace/src/test/part-00003.snappy.parquet
import spark.implicits._
val dataPathColumn = "_data_path"
val relation = df.queryExecution.optimizedPlan.asInstanceOf[LogicalRelation]
val lineagePairs =
Hyperspace.getContext(spark).sourceProviderManager.lineagePairs(relation, fileIdTracker)
val lineagePairs = Hyperspace.getContext(spark).sourceProviderManager
.lineagePairs(df.queryExecution.optimizedPlan, fileIdTracker)
val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID)

df.withColumn(dataPathColumn, input_file_name())
Expand All @@ -211,6 +213,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
// Extract partition keys, if original data is partitioned.
val partitionSchemas = df.queryExecution.optimizedPlan.collect {
case LogicalRelation(HadoopFsRelation(_, pSchema, _, _, _, _), _, _, _) => pSchema
case DataSourceV2Relation(_, _, _, _, uSchema) => uSchema.getOrElse(StructType(Nil))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't think we should have DataSourceV2Relation specific code here. Can we move this to the source provider API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both LogicalRelation and DataSourceV2Relation are on the same level. Both directly extend LeafNode. If LogicalRelation is present here I would say that DataSourceV2Relation should also be here, as in this PR we open up to DataSourceV2 Spark API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't get your argument on the same level. Why can't we introduce partitionSchema to the source provider? I think we missed moving this into source provider since default/delta have the same implementation; we can have the different implementation (matching FileIndex) for them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Delta is not built on top of DataSourceV2 Spark API thus it's not the same implementation.
  2. Both LogicalRelation and DataSourceV2Relation are first "child" from LeafNode, both directly extend LeafNode.
             LeafNode
              //  \\
             //    \\
            //      \\
LogicalRelation    DataSourceV2Relation
  1. This is the PR addressing support for DataSourceV2 which is Spark not Iceberg

Copy link
Contributor

@imback82 imback82 Feb 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that rules shouldn't directly work with LogicalRelation or DataSourceV2Relation. I think we can abstract that out. Source provider can choose which relation it supports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I will create a PR to your branch this week.

}

// Currently we only support creating an index on a single LogicalRelation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.index

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.util.HashingUtils
Expand Down Expand Up @@ -49,7 +50,7 @@ class FileBasedSignatureProvider extends LogicalPlanSignatureProvider {
private def fingerprintVisitor(logicalPlan: LogicalPlan): Option[String] = {
var fingerprint = ""
logicalPlan.foreachUp {
case p: LogicalRelation =>
case p @ (_: LogicalRelation | _: DataSourceV2Relation) =>
fingerprint ++= Hyperspace.getContext.sourceProviderManager.signature(p)
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources._

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
import com.microsoft.hyperspace.util.{HyperspaceConf, LogicalPlanUtils, ResolverUtils}

/**
* FilterIndex rule looks for opportunities in a logical plan to replace
Expand All @@ -50,7 +49,7 @@ object FilterIndexRule
// 1. The index covers all columns from the filter predicate and output columns list, and
// 2. Filter predicate's columns include the first 'indexed' column of the index.
plan transformDown {
case ExtractFilterNode(originalPlan, filter, outputColumns, filterColumns, _, _) =>
case ExtractFilterNode(originalPlan, filter, outputColumns, filterColumns) =>
try {
val candidateIndexes =
findCoveringIndexes(filter, outputColumns, filterColumns)
Expand Down Expand Up @@ -136,7 +135,6 @@ object FilterIndexRule
* @param filterColumns List of columns in filter predicate.
* @param indexedColumns List of indexed columns (e.g. from an index being checked)
* @param includedColumns List of included columns (e.g. from an index being checked)
* @param fileFormat FileFormat for input relation in original logical plan.
* @return 'true' if
* 1. Index fully covers output and filter columns, and
* 2. Filter predicate contains first column in index's 'indexed' columns.
Expand All @@ -160,34 +158,34 @@ object ExtractFilterNode {
LogicalPlan, // original plan
Filter,
Seq[String], // output columns
Seq[String], // filter columns
LogicalRelation,
HadoopFsRelation)
Seq[String]) // filter columns

def unapply(plan: LogicalPlan): Option[returnType] = plan match {
case project @ Project(
_,
filter @ Filter(
condition: Expression,
logicalRelation @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)))
if !RuleUtils.isIndexApplied(fsRelation) =>
p: LogicalPlan))
if LogicalPlanUtils.isSupportedRelation(p) &&
!RuleUtils.isIndexApplied(p) =>
val projectColumnNames = CleanupAliases(project)
.asInstanceOf[Project]
.projectList
.map(_.references.map(_.asInstanceOf[AttributeReference].name))
.flatMap(_.toSeq)
val filterColumnNames = condition.references.map(_.name).toSeq

Some(project, filter, projectColumnNames, filterColumnNames, logicalRelation, fsRelation)
Some(project, filter, projectColumnNames, filterColumnNames)

case filter @ Filter(
condition: Expression,
logicalRelation @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _))
if !RuleUtils.isIndexApplied(fsRelation) =>
val relationColumnsName = logicalRelation.output.map(_.name)
p: LogicalPlan)
if LogicalPlanUtils.isSupportedRelation(p) &&
!RuleUtils.isIndexApplied(p) =>
val relationColumnsName = p.output.map(_.name)
val filterColumnNames = condition.references.map(_.name).toSeq

Some(filter, filter, relationColumnsName, filterColumnNames, logicalRelation, fsRelation)
Some(filter, filter, relationColumnsName, filterColumnNames)

case _ => None // plan does not match with any of filter index rule patterns
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ import scala.util.Try

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, AttributeSet, EqualTo, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, EqualTo, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.rankers.JoinIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.LogicalPlanUtils
import com.microsoft.hyperspace.util.ResolverUtils._

/**
Expand Down Expand Up @@ -166,8 +168,8 @@ object JoinIndexRule
*/
private def isPlanModified(plan: LogicalPlan): Boolean = {
plan.find {
case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) =>
RuleUtils.isIndexApplied(fsRelation)
case p: LogicalRelation =>
andrei-ionescu marked this conversation as resolved.
Show resolved Hide resolved
RuleUtils.isIndexApplied(p)
case _ => false
}.isDefined
}
Expand Down Expand Up @@ -338,7 +340,9 @@ object JoinIndexRule
}

private def relationOutputs(l: LogicalPlan): Seq[Attribute] = {
l.collectLeaves().filter(_.isInstanceOf[LogicalRelation]).flatMap(_.output)
l.collectLeaves()
.filter(LogicalPlanUtils.isSupportedRelation)
.flatMap(_.output)
}

/**
Expand Down Expand Up @@ -379,7 +383,7 @@ object JoinIndexRule
private def allRequiredCols(plan: LogicalPlan): Seq[String] = {
val cleaned = CleanupAliases(plan)
val allReferences = cleaned.collect {
case _: LogicalRelation => Seq()
case _ @ (_: LogicalRelation | _: DataSourceV2Relation) => Seq()
case p => p.references
}.flatten
val topLevelOutputs = cleaned.outputSet.toSeq
Expand Down
Loading