-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add spark submodule to convert SparkSql's LogicalPlan to Substrait Rel. #90
Conversation
Similar to this PR, we are working on the conversion from SparkSql physical plan to Substrait plan. Currently, we have finished read, filter, project, aggregation and join. Some expressions used in TPC-H are also supported. We found there are some common parts for the conversion of logic and physical plan. Can we also upstream our code to substrait-java? |
The project Rui refer to : https://github.com/oap-project/gluten |
@jinfengni and @JamesRTaylor, can you review this? |
Sorry, I missed reading earlier message. Will do the review. |
My overall impression is this PR is still in a work-in-progress stage. It defines a type converter, expression converter, logical plan converter. But each converter support very limited inputs. If I understand correctly, it probably convert only "select col from t" like query. My suggestion is we split this PR into smaller scoped building blocks for a spark plan converter. Maybe start with type converter, to deal with the majorities of types in Spark, then continue with expression converter with majorities of expression. We then can deal with the logical plan converter. For each converter, add UT to cover the types/expressions / logical relation operators that are supported. |
@jinfengni . Yeah , this PR just gives a skeleton to build rel producer of spark logical plan , then other people can continue to add more conversions . The next step is to support logical level expression conversion (WIP). |
One general question: what's the use case to convert Spark SQL to Substrait logic level? |
What's the goal of "substrait-java". My understanding is it's a standalone module without any dependency to application level. We used it in our Gluten's query plan conversion. But the conversion should be part of the application, instead of substrait-java. That's why we never thought to contribute to here. If we contribute, a seperate repo can be more useful. |
The substrait-java is a place to house Substrait related java programs. There are several modules/components within Substrait Java. We have a core library for arbitrary use. We also have Isthmus, which is an application for converting between SQL and Substrait. Having more modules like this one (spark to substrait) are appropriate for here. Really just focused on anything substrait and jvm. |
spark/src/main/scala/io/substrait/spark/SparkTypeConverter.scala
Outdated
Show resolved
Hide resolved
@jacques-n As you mention spark, substrait-java only supports java 17 and above, I know java 8 is the end of life, but the mainstream JVM of spark is java 8. Any suggestion for java 8? Thanks |
I think the latest spark |
@winningsix |
java8 is still very commonly used by spark customers. |
We use substrait to pass Spark plan to Native library. One issue we met is that spark plan isn't 1:1 mapping to native library plan. Now our solution is in Gluten we convert one Spark operator into one or more substrait operator. Then in native library, we use 1 to 1 mapping from substria to native operator. Does trino-sql has 1:1 mapping to Spark plan node? |
No, different sql engines have different logical/physical sql plans , it's difficult to do 1:1 mapping for these query plans. Substrait was created to provide a standard and open format for these query plans, but there're still a lot of early works to do. For example: @jacques-n , a question about |
This is my concern of the conversion here. If we transform Spark's logic plan 1:1 to substrait plan, I need another layer either in java or c++ to modify the substrait plan into native library's plan. Currently we do the modification when we create the substrait plan. Are you going to create 1:1 mapping between spark logic plan and substrait plan? |
Yes, you need to follow the Substrait spec for field references. |
We alreay integrated substrait-spark in gluten(apache/incubator-gluten#515). Although, gluten only needs to convert physical plan to substrait rel, we still develop a logical plan to substrait-rel transformation for UT. Due to schedule pressure, we put it on gluten for now, Would you like to do a migration from gluten to substrait-java? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minnor Scala suggestions
var schema: StructType = null | ||
var tableNames: List[String] = null; | ||
plan match { | ||
case logicalRelation: LogicalRelation => | ||
schema = logicalRelation.schema | ||
tableNames = logicalRelation.catalogTable.get.identifier.unquotedString.split("\\.").toList | ||
buildNamedScan(schema, tableNames) | ||
case dataSourceV2ScanRelation: DataSourceV2ScanRelation => | ||
schema = dataSourceV2ScanRelation.schema | ||
tableNames = dataSourceV2ScanRelation.relation.identifier.get.toString.split("\\.").toList | ||
buildNamedScan(schema, tableNames) | ||
case dataSourceV2Relation: DataSourceV2Relation => | ||
schema = dataSourceV2Relation.schema | ||
tableNames = dataSourceV2Relation.identifier.get.toString.split("\\.").toList | ||
buildNamedScan(schema, tableNames) | ||
case hiveTableRelation: HiveTableRelation => | ||
schema = hiveTableRelation.schema | ||
tableNames = hiveTableRelation.tableMeta.identifier.unquotedString.split("\\.").toList | ||
buildNamedScan(schema, tableNames) | ||
//TODO: LocalRelation,Range=>Virtual Table,LogicalRelation(HadoopFsRelation)=>LocalFiles | ||
|
||
case _ => | ||
throw new UnsupportedOperationException("Unable to convert the plan to a substrait AbstractReadRel: " + plan) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var schema: StructType = null | |
var tableNames: List[String] = null; | |
plan match { | |
case logicalRelation: LogicalRelation => | |
schema = logicalRelation.schema | |
tableNames = logicalRelation.catalogTable.get.identifier.unquotedString.split("\\.").toList | |
buildNamedScan(schema, tableNames) | |
case dataSourceV2ScanRelation: DataSourceV2ScanRelation => | |
schema = dataSourceV2ScanRelation.schema | |
tableNames = dataSourceV2ScanRelation.relation.identifier.get.toString.split("\\.").toList | |
buildNamedScan(schema, tableNames) | |
case dataSourceV2Relation: DataSourceV2Relation => | |
schema = dataSourceV2Relation.schema | |
tableNames = dataSourceV2Relation.identifier.get.toString.split("\\.").toList | |
buildNamedScan(schema, tableNames) | |
case hiveTableRelation: HiveTableRelation => | |
schema = hiveTableRelation.schema | |
tableNames = hiveTableRelation.tableMeta.identifier.unquotedString.split("\\.").toList | |
buildNamedScan(schema, tableNames) | |
//TODO: LocalRelation,Range=>Virtual Table,LogicalRelation(HadoopFsRelation)=>LocalFiles | |
case _ => | |
throw new UnsupportedOperationException("Unable to convert the plan to a substrait AbstractReadRel: " + plan) | |
} | |
val (schema, tableNames) = | |
plan match { | |
case logicalRelation: LogicalRelation => | |
(logicalRelation.schema, logicalRelation.catalogTable.get.identifier.unquotedString.split("\\.").toList) | |
case dataSourceV2ScanRelation: DataSourceV2ScanRelation => | |
(dataSourceV2ScanRelation.schema, dataSourceV2ScanRelation.relation.identifier.get.toString.split("\\.").toList) | |
case dataSourceV2Relation: DataSourceV2Relation => | |
(dataSourceV2Relation.schema, dataSourceV2Relation.identifier.get.toString.split("\\.").toList) | |
case hiveTableRelation: HiveTableRelation => | |
(hiveTableRelation.schema, hiveTableRelation.tableMeta.identifier.unquotedString.split("\\.").toList) | |
//TODO: LocalRelation,Range=>Virtual Table,LogicalRelation(HadoopFsRelation)=>LocalFiles | |
case _ => | |
throw new UnsupportedOperationException("Unable to convert the plan to a substrait AbstractReadRel: " + plan) | |
} | |
buildNamedScan(schema, tableNames) |
val creator = Type.withNullability(true) | ||
val names = new java.util.ArrayList[String] | ||
val children = new java.util.ArrayList[Type] | ||
schema.fields.foreach(field => { | ||
names.add(field.name) | ||
children.add(convert(field.dataType, field.nullable)) | ||
}) | ||
val struct = creator.struct(children) | ||
NamedStruct.of(names, struct) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val creator = Type.withNullability(true) | |
val names = new java.util.ArrayList[String] | |
val children = new java.util.ArrayList[Type] | |
schema.fields.foreach(field => { | |
names.add(field.name) | |
children.add(convert(field.dataType, field.nullable)) | |
}) | |
val struct = creator.struct(children) | |
NamedStruct.of(names, struct) | |
import scala.jdk.CollectionConverters._ | |
val name = schema.fields.map(_.name).asJava | |
val struct = creator.struct( | |
schema.fields.map(field => convert(field.dataType, field.nullable)).asJava | |
) | |
NamedStruct.of(names, struct) |
// TODO: now we lost the nested StructType's field names,do we need them? | ||
//val names = new java.util.ArrayList[String] | ||
val children = new java.util.ArrayList[Type] | ||
fields.foreach(field => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map
@baibaichen I’d be willing to migrate your Before it was removed (apache/incubator-gluten#4609) from the gluten repo, we had already started using it as part of a project here (IBM Research) so we are interested in keeping it alive and open-source. Let me know, then I will prepare a separate PR. Thanks! |
implementation("org.apache.spark:spark-sql_2.12:3.3.0") | ||
testImplementation("org.apache.spark:spark-hive_2.12:3.3.0") | ||
|
||
// testImplementation("org.apache.spark:spark-sql_2.12:3.3.0:tests") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove?
I guess this PR has been superseded by #271 which has been merged recently and we could close this one. |
Obsoleted by #271 |
At this moment, it aims to build a substrait rel producer of SparkSql at logical level.
Currently only the type converter and parts of Read Operators of Spark are finished, and some related works are in progress.
The testcases aslo integrate a hive-standalone-metastore docker container , that makes testing some operators conveniently, such as Iceberg/Hive datasource relation operators.