Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace LogicalPlanSerDeUtils #99

Merged
merged 12 commits into from
Aug 2, 2020

Conversation

sezruby
Copy link
Collaborator

@sezruby sezruby commented Jul 29, 2020

What changes were proposed in this pull request?

Instead of using KryoSerializer, reconstruct a DataFrame with {rootPaths, dataSchema}. In order to do this, we need to keep the necessary info in IndexLogEntry

Note that this approach can only be applied to Covering Index & HadoopFsRelation; limitations of the current version of Hyperspace.

Why are the changes needed?

KryoSerializer is not compatible with the different versions of Spark; so refreshing an index built with another spark version would fail.

Fixes #98

Does this PR introduce any user-facing change?

Yes, indexes created before this change should be removed & recreated.
IndexLogEntry (metadata) will be changed; new "relations" field:

...
"source" : {
    "plan" : {
      "properties" : {
        "relations" : [ {
          "rootPaths" : [ "file:/<>/table2" ],
          "data" : {
            "properties" : {
              "content" : {
                "root" : "",
                "directories" : [ {
                  "path" : "",
                  "files" : [ "file:/<>/table2/part-00000-2547e7e7-2577-4e07-b796-0a36e3e5444e-c000.snappy.parquet", "file:/<>/table2/part-00001-2547e7e7-2577-4e07-b796-0a36e3e5444d-c000.snappy.parquet" ],
                  "fingerprint" : {
                    "kind" : "NoOp",
                    "properties" : { }
                  }
                } ]
              }
            },
            "kind" : "HDFS"
          },
          "dataSchemaJson" : "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}",
          "fileFormat" : "parquet"
        } ],
        "rawPlan" : null,
        "sql" : null,
        "fingerprint" : {
          "properties" : {
            "signatures" : [ {
              "provider" : "com.microsoft.hyperspace.index.IndexSignatureProvider",
              "value" : "921391350ed46e5ac8761bc4fc5a466a"
            } ]
          },
          "kind" : "LogicalPlan"
        }
      },
      "kind" : "Spark"
    }

How was this patch tested?

refreshIndex tested both spark 2.4.6 + scala 2.11 <=> spark 3.0.0 + scala 2.12 locally

@sezruby
Copy link
Collaborator Author

sezruby commented Jul 29, 2020

@imback82 Could you do a code review simply? and I'm not sure we should keep the previous version for backward compatibility or not..

@rapoth rapoth requested a review from imback82 July 29, 2020 21:28
@sezruby sezruby changed the title [WIP] Replace LogicalPlanSerDeUtils Replace LogicalPlanSerDeUtils Jul 30, 2020
@apoorvedave1 apoorvedave1 self-requested a review July 30, 2020 20:27
@sezruby
Copy link
Collaborator Author

sezruby commented Jul 31, 2020

@ reviewers, Should I modify the below CreateActionEvent log accordingly?

   final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
    // LogEntry instantiation may fail if index config is invalid. Hence the 'Try'.
    val index = Try(logEntry.asInstanceOf[IndexLogEntry]).toOption
    CreateActionEvent(appInfo, indexConfig, index, df.queryExecution.logical.toString, message)
  }```

.write
.option("header", "true")
.csv(refreshTestLocation)
val df = spark.read.option("header", "true").csv(refreshTestLocation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is the only difference compared to the parquet test? If so can we refactor common code or do

Seq(("parquet", None), ("csv", Some(Map("header" -> "true"))).foreach {
  case (format, option) =>
    ...
}

Either one is fine as long as duplicate code is removed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While trying to fix this test, I realized that we also need to capture options field.
Let me know if there is any other field of HadoopFsRelation that might be useful later..?

case class HadoopFsRelation(
    location: FileIndex,
    partitionSchema: StructType,
    dataSchema: StructType,
    bucketSpec: Option[BucketSpec],
    fileFormat: FileFormat,
    options: Map[String, String])(val sparkSession: SparkSession)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on capturing options! I think that should be all. partitionSchema would be populated based on the rootPaths during read, so I think we should be good. @apoorvedave1 can double-confirm.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looking good to me except for few minor comments.

imback82
imback82 previously approved these changes Aug 2, 2020
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (except for few nit comments), thanks @sezruby!

// Currently we only support to create an index on a LogicalRelation
assert(rels.size == 1)
val dataSchema = DataType.fromJson(rels.head.dataSchemaJson).asInstanceOf[StructType]
// "path" key in options incurs to read data twice unexpectedly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we end a sentence with "."? Btw, did you meaning listing files (not reading data)// Remove "path" key in options to prevent listing files twice.?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. The data is actually read twice.

Copy link
Contributor

@imback82 imback82 Aug 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what's happening and I think this could be a bug in Spark. I have a potential fix and will follow up on this.

// Both are same and will read the data twice.
spark.read.option("path", "/tmp/parquet_data/").parquet("/tmp/parquet_data/").show
spark.read.parquet("/tmp/parquet_data/", "/tmp/parquet_data/").show
// But the following is fine:
spark.read.option("path", "/tmp/parquet_data/").format("parquet").load("/tmp/parquet_data/").show

Putting the above behavior aside, if the user created a df with:

// Intention is to read three times.
val df = spark.read.option("path", "/tmp/parquet_data").format("parquet").load("/tmp/parquet_data/", "/tmp/parquet_data/")

If the user intention was reading the data three times, we can't remove "path" in option? This is an edge case, but I think we still want to document this behavior.

Copy link
Collaborator Author

@sezruby sezruby Aug 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be a bug. In our refresh-full test, only the relation from csv format has the "path" key in its options even I read it without "path" option.

Copy link
Collaborator Author

@sezruby sezruby Aug 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it seems rootPaths are stored correctly if the user put the same path multiple times at load:

val df = spark.read.option("path", "/tmp/parquet_data").format("parquet").load("/tmp/parquet_data/", "/tmp/parquet_data/")

In this case rootPaths might be ["/tmp/parquet_data", "/tmp/parquet_data/", "tmp/parquet_data"]

So I think excluding "path" key here would be okay.

Copy link
Collaborator Author

@sezruby sezruby Aug 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an inconsistent behavior (Spark 2.4.6/3.0.0):

scala> val df = spark.read.option("path","table2").load("table2","table2")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df.show
+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|name1|  4|
|  2|name2|  3|
|  1|name1|  4|
|  2|name2|  3|
|  1|name1|  4|
|  2|name2|  3|
+---+-----+---+


scala> val df = spark.read.option("path","table2").load("table2")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df.show
+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|name1|  4|
|  2|name2|  3|
+---+-----+---+

Copy link
Contributor

@imback82 imback82 Aug 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an inconsistent behavior (Spark 2.4.6/3.0.0):

Yup, that's what I meant here, which I am creating a PR against OSS Spark. Actually, this is a bit different; path is prepended if there are more than one path in load, but "replaced" if there is one path in load. But for the multiple paths scenario, the behavior is consistent whether it's format("parquet").load(...) or parquet(...). I will ask if we should always append path from option regardless of the number of paths.

Here are the follow up action items:

  1. Should we move removing path to the creation side so that it's not in the log entry? The path "can" be added by Spark (not by user), so it is not useful for debugability anyway. Let me know if there is a use case for storing path in the log entry.
  2. Can we add a test that tests against all the scenarios (useful for reference as well)?
spark.read.format("parquet").load("f1")
spark.read.format("parquet").load("f1", "f2")
spark.read.format("parquet").option("path", "blah").load("f1")
spark.read.format("parquet").option("path", "blah").load("f1", "f2")
spark.read.parquet("f1")
spark.read.parquet("f1", "f2")
spark.read.option("path", "blah").parquet("f1")
spark.read.option("path", "blah").parquet("f1", "f2")

assert(indexCount == 3)

FileUtils.delete(new Path(refreshTestLocation))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove empty line

@imback82 imback82 merged commit 6dac4f1 into microsoft:master Aug 2, 2020
@imback82
Copy link
Contributor

imback82 commented Aug 2, 2020

@sezruby I merged this PR, but can you address this as a follow up PR? Thanks!

@sezruby sezruby deleted the i98_logicalrelation branch August 3, 2020 02:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

KryoSerializer is not compatible with the different versions of Spark
3 participants