Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Add lineage column(s) to index records #121

Merged
merged 29 commits into from
Aug 23, 2020
Merged

Conversation

pirz
Copy link
Contributor

@pirz pirz commented Aug 11, 2020

What changes were proposed in this pull request?

This PR proposes to fix #104 by adding lineage columns to index records along with test cases for validating the change.

For non-partitioned data, lineage is captured thru adding a single column to each index record which saves the source file name. For partitioned data, beside file name, we add columns for partition key(s) if they are not already among user-specified columns in index config.

Adding lineage to index is optional (by default it is disabled for now); It can be enabled/disabled via below spark config:

  • INDEX_LINEAGE_ENABLED = "spark.hyperspace.index.lineage.enabled"

Example. Assume input data has a schema with four columns: Date, RGUID, Query, imprs, clicks and it is partitioned on two columns: Date and Query. For an index created with IndexConfig("index1", Seq("RGUID"), Seq("Date")):

  • Sample index records without lineage (current behavior in Hyperspace) look like as:
RGUID   Date
50d690516ca641438166049a6303650c 2018-09-03
576ed96b0d5340aa98a47de15c9f87ce 2018-09-03
  • Sample index records with lineage look like as below. _date_file_name (which contains the full file name including the extension) and Query (a partitioning key which was not already among index config columns) are added to capture lineage along with the Date columns.
RGUID    Date Query _data_file_name
50d690516ca641438166049a6303650c 2018-09-03 donde part-00001-20a62dfd-f0e0-4f17-8a70-93b90523ac38-c000.snappy.parquet
576ed96b0d5340aa98a47de15c9f87ce 2018-09-03 ibraco part-00001-77a49590-b385-454c-9877-51ab3b626e89-c000.snappy.parquet

Why are the changes needed?

Lineage is required for supporting DELETE during index refresh when one or more source data files are removed.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test cases are added in CreateIndexTests.scala.

@pirz pirz self-assigned this Aug 11, 2020
@pirz pirz requested a review from thrajput August 11, 2020 00:12
@pirz pirz added the enhancement New feature or request label Aug 11, 2020
@imback82
Copy link
Contributor

@pirz Can you fix the build please?

@pirz
Copy link
Contributor Author

pirz commented Aug 11, 2020

@imback82 It is green now

val relation = HadoopFsRelation(
location,
new StructType(),
StructType(index.schema.filter(baseRelation.schema.contains(_))),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about defining a new function for this like index.originalSchema?

And I think index.schema.filter(field => field.name != IndexConstants.DATA_FILE_NAME_COLUMN) would be better?

Copy link
Contributor Author

@pirz pirz Aug 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we only have one extra column and your suggested code works fine. However, I think it is better if we rely on the source data schema to pick schema for an index usage. This way if we add more extra column(s) later; this code works as expected with no change required.

@sezruby Please let me know if above is clear and makes sense to you.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation, but it's a bit unclear to me. The current version also just uses index.schema, so it means the current version cannot handle the case you described?
I could be wrong, but this schema of HadoopFsRelation seems for reading index data and updatedOutput will filter unused columns out..?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As current version uses index.schema it has a similar issue, however I updated my comment as I realized this behavior is actually desired. The reason for that is if the source data at the query time uses a subset of the original source data then we should not use index as it could have extra rows that were part of original data and now excluded from the source data at query time. Imagine the index was created on source data with two partitions: .../data/A=a1 and .../data/A=a2. An index created on .../data path has records from both partitions a1 and a2. However, if data path at query time is set to .../data/A=a1 then it means records from the a2 partition should be excluded from the final results and if we use the index during query we may get those records back depending on the query and index config (i.e. potentially wrong query results). Moreover, our current signature computation/comparison does not mark such an index as a candidate for the query - and this is a correct behavior.
Regarding your point about HadoopFsRelation, if I understood what you mentioned correctly: Yes that schema is used for reading data; however any column in the schema is expected to be already resolved (i.e. has to have an ExprId) by the time we switch to an index. Once we switch to an index, we still use the same ExprId that Spark had assigned to the columns when analyzing the plan with source data. Hence, if the index schema has some column which was not present during former analyze phase, the query fails after transformation as such a column has no ExprId.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I got the point. I might also need to investigate ExprId related things for hybrid scan. Thanks :)

@imback82
Copy link
Contributor

imback82 commented Aug 14, 2020

Could you add the output in the description when you do df.show where df is created with index files, compared with the previous version of index? (both for partitioned/non-partitioned source data)


val df = spark.read.parquet(samplePartitionedParquetDataLocation)
val indexConfig =
IndexConfig("filterIndex", Seq("imprs", "Query"), Seq("Date", "RGUID", "clicks"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use c1, c2, c3, etc. similar to other tests in this suite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have few comments, but generally looking good to me.

}

private val getFileName: UserDefinedFunction = udf(
(fullFilePath: String) => FilenameUtils.getBaseName(fullFilePath))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will strip off extension as well. is that what we want?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct - I can not think of a case where dropping the extension could cause an issue. But adding it does not hurt either as most likely it wont cause an issue given that it will be repeated per file name and most likely Parquet's compression will take care of storing it efficiently. I can make sure we wont miss it if you have preference to capture it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are "source" data, I could have "/tmp/1.csv", and "/tmp/1.parquet". If I tried to create an index on dataframe created by spark.read.csv("/tmp/1.csv"), would this be OK?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to getName to keep the extension.

("2019-10-03", "ff60e4838b92421eafc3e6ee59a9e9f1", "mi perro", 2, 2000),
("2019-10-03", "187696fe0a6a40cc9516bc6e47c70bc1", "facebook", 4, 3000))

def saveTestDataNonPartitioned(spark: SparkSession, path: String, colNames: String*): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about combining these two methods into

def save(spark: SparkSession, path: String, columns: Seq[String], partitionColumns: Option[Seq[String]] = None): Unit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, merged them.

assert(
indexRecordsDF.schema.fieldNames.sorted.corresponds(
(indexConfig1.indexedColumns ++ indexConfig1.includedColumns ++
Seq(IndexConstants.DATA_FILE_NAME_COLUMN)).sorted)(_.equals(_)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems simpler if you just do the following?

      assert(
        indexRecordsDF.schema.fieldNames.sorted ===
          (indexConfig1.indexedColumns ++ indexConfig1.includedColumns ++
            Seq(IndexConstants.DATA_FILE_NAME_COLUMN)).sorted)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, modified it.

@@ -159,12 +200,37 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper {
getIndexFilesPath(rightDfIndexConfig.indexName)))
}

test("E2E test for join query on partitioned data with lineage.") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks the same as the test above except for the data. If so, can we do

Seq(sampleNonPartitionedParquetDataLocation, samplePartitionedParquetDataLocation).foreach { loc =>
  // common code here.
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I consolidated/Expand this case and also two scenarios for Filter query. Now Filter query (2 cases one with Project and one for select *) and Join query all check for 4 combinations of partitioned/non-partitioned data with/wout lineage.

private val testDir = "src/test/resources/e2eTests/"
private val sampleParquetDataLocation = testDir + "sampleparquet"
private val sampleNonPartitionedParquetDataLocation = testDir + "sampleparquet"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just name these as nonPartitionedDataPath and partitionedDataPath?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, renamed it.

private val sampleParquetDataLocation = "src/test/resources/sampleparquet"
private val testDir = "src/test/resources/createIndexTests/"
private val sampleNonPartitionedParquetDataLocation = testDir + "sampleparquet"
private val samplePartitionedParquetDataLocation = testDir + "samplepartitionedparquet"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. check the comment below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed it.

FileUtils.delete(new Path(testDir), true)

val dataColumns = Seq("Date", "RGUID", "Query", "imprs", "clicks")
// save test non-partitioned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments on tests, but generally looks good to me.


clearCache()
fileSystem.delete(systemPath, true)
spark.disableHyperspace()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you define the following in HyperspaceSuite.scala:

  /**
   * Vacuum indexes with the given names after calling `f`.
   */
  protected def withIndex(indexNames: String*)(f: => Unit): Unit = {
    try f
    finally {
      val hs = new Hyperspace(spark)
      indexNames.foreach { name =>
        hs.deleteIndex(name)
        hs.vacuumIndex(name)
      }
    }
  }

Then you can modify this function to:

  test("E2E test for filter query on partitioned/non-partitioned data with/without lineage.") {
    Seq(nonPartitionedDataPath, partitionedDataPath).foreach { loc =>
      Seq(true, false).foreach { enableLineage =>
        withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> enableLineage.toString) {
          withIndex("filterIndex") {
            val df = spark.read.parquet(loc)
            val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1"))

            hyperspace.createIndex(df, indexConfig)

            def query(): DataFrame = df.filter("c3 == 'facebook'").select("c3", "c1")

            verifyIndexUsage(query, Seq(getIndexFilesPath(indexConfig.indexName)))
          }
        }
      }
    }
  }

And you don't need to repeat these clean ups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! much better - I added it; Thnx

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for few nit comments

Pouria Pirzadeh and others added 2 commits August 23, 2020 11:23
@@ -284,7 +291,7 @@ class IndexManagerTests extends SparkFunSuite with SparkInvolvedSuite {
IndexLogEntry.schemaString(schema),
IndexConstants.INDEX_NUM_BUCKETS_DEFAULT)),
Content(
s"$indexStorageLocation/${indexConfig.indexName}" +
s"${systemPath.toUri.toString}/${indexConfig.indexName}" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be just $systemPath. I pushed a commit to change these.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @pirz!

@imback82 imback82 merged commit 96d65ea into microsoft:master Aug 23, 2020
@imback82
Copy link
Contributor

@pirz can you update the description to include extension for the filename as well?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add lineage to covering index records
3 participants