Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Add new IndexLogEntryTags to cache InMemoryFileIndex #324

Merged
merged 15 commits into from
Jan 29, 2021

Conversation

sezruby
Copy link
Collaborator

@sezruby sezruby commented Jan 18, 2021

What is the context for this pull request?

What changes were proposed in this pull request?

This PR introduces 3 new IndexLogEntryTags for caching InMemoryFileIndex.
Currently, InMemoryFileIndex is created for every query execution. However, it incurs a new spark job & sometimes takes longer in cluster mode. To avoid the overhead, we could cache InMemoryFileIndex object for each index entry.

  // INMEMORYFILEINDEX_INDEX_ONLY stores InMemoryFileIndex for index only scan.
  val INMEMORYFILEINDEX_INDEX_ONLY: IndexLogEntryTag[InMemoryFileIndex] =
    IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexIndexOnly")

  // INMEMORYFILEINDEX_HYBRID_SCAN stores InMemoryFileIndex including index data files and also
  // appended files for Hybrid Scan.
  val INMEMORYFILEINDEX_HYBRID_SCAN: IndexLogEntryTag[InMemoryFileIndex] =
    IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScan")

  // INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED stores InMemoryFileIndex including only appended files
  // for Hybrid Scan.
  val INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED: IndexLogEntryTag[InMemoryFileIndex] =
    IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScanAppended")

Test result with cache

Test scripts:

hs.refreshIndex(indexName) // use refreshIndex to clear the tags.
val linetable = spark.read.parquet(tableName)
val filter = linetable.filter(linetable("l_partkey") isin (1234,12341234, 123456)).select("l_suppkey","l_quantity","l_shipdate","l_extendedprice","l_discount","l_orderkey")
measure(filter.count)
val filter = linetable.filter(linetable("l_partkey") isin (1234,12341234, 123456)).select("l_suppkey","l_quantity","l_shipdate","l_extendedprice","l_discount","l_orderkey")
measure(filter.count) // second query with cache

hs.refreshIndex(indexName)
val filter = linetable.filter(linetable("l_partkey") isin (1234,12341234, 123456)).select("l_suppkey","l_quantity","l_shipdate","l_extendedprice","l_discount","l_orderkey")
measure(filter.count)
hs.refreshIndex(indexName) // clear cache
val filter = linetable.filter(linetable("l_partkey") isin (1234,12341234, 123456)).select("l_suppkey","l_quantity","l_shipdate","l_extendedprice","l_discount","l_orderkey")
measure(filter.count) // second query without cache

Result:

linetable: org.apache.spark.sql.DataFrame = [l_orderkey: bigint, l_partkey: bigint ... 14 more fields]
filter: org.apache.spark.sql.DataFrame = [l_suppkey: bigint, l_quantity: double ... 4 more fields]
duration: 2212
filter: org.apache.spark.sql.DataFrame = [l_suppkey: bigint, l_quantity: double ... 4 more fields]
duration: 868
filter: org.apache.spark.sql.DataFrame = [l_suppkey: bigint, l_quantity: double ... 4 more fields]
duration: 2235
filter: org.apache.spark.sql.DataFrame = [l_suppkey: bigint, l_quantity: double ... 4 more fields]
duration: 2121

Spark UI:
image

Does this PR introduce any user-facing change?

Yes, if the cached InMemoryFileIndex object is used, we could avoid unnecessary listing files jobs for every query execution.

How was this patch tested?

Unit test

val newLocation = new InMemoryFileIndex(spark, filesAppended, options, None)
val newLocation = index.getTagValueOrUpdate(originalPlan,
IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED,
new InMemoryFileIndex(spark, filesAppended, options, None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: is it always guaranteed that the cached file index will always have the same files (filesAppended in this case)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea because hybrid scan tags are tagged with the plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Can you add a test? It will be the opposite of what you already added; check the cached file index is not used if plan changes.

@imback82
Copy link
Contributor

Currently, InMemoryFileIndex is created for every query execution. However, it incurs a new spark job & sometimes takes longer in cluster mode.

Can you attach the screenshots of jobs from Spark UI if available (before and after)? It will be very useful to check the improvements.

@sezruby
Copy link
Collaborator Author

sezruby commented Jan 20, 2021

Test scripts:

hs.refreshIndex(indexName) // use refreshIndex to clear the tags.
val linetable = spark.read.parquet(tableName)
val filter = linetable.filter(linetable("l_partkey") isin (1234,12341234, 123456)).select("l_suppkey","l_quantity","l_shipdate","l_extendedprice","l_discount","l_orderkey")
measure(filter.count)
val filter = linetable.filter(linetable("l_partkey") isin (1234,12341234, 123456)).select("l_suppkey","l_quantity","l_shipdate","l_extendedprice","l_discount","l_orderkey")
measure(filter.count) // second query with cache

hs.refreshIndex(indexName)
val filter = linetable.filter(linetable("l_partkey") isin (1234,12341234, 123456)).select("l_suppkey","l_quantity","l_shipdate","l_extendedprice","l_discount","l_orderkey")
measure(filter.count)
hs.refreshIndex(indexName) // clear cache
val filter = linetable.filter(linetable("l_partkey") isin (1234,12341234, 123456)).select("l_suppkey","l_quantity","l_shipdate","l_extendedprice","l_discount","l_orderkey")
measure(filter.count) // second query without cache

Result:

linetable: org.apache.spark.sql.DataFrame = [l_orderkey: bigint, l_partkey: bigint ... 14 more fields]
filter: org.apache.spark.sql.DataFrame = [l_suppkey: bigint, l_quantity: double ... 4 more fields]
duration: 2212
filter: org.apache.spark.sql.DataFrame = [l_suppkey: bigint, l_quantity: double ... 4 more fields]
duration: 868
filter: org.apache.spark.sql.DataFrame = [l_suppkey: bigint, l_quantity: double ... 4 more fields]
duration: 2235
filter: org.apache.spark.sql.DataFrame = [l_suppkey: bigint, l_quantity: double ... 4 more fields]
duration: 2121

Spark UI:
image

@imback82
Copy link
Contributor

Great! Can you update the description with this info (you can just copy/paste)? Thanks!

@imback82
Copy link
Contributor

@sezruby Can you fix the conflicts please?

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (few minor comments), thanks @sezruby!

def query(df: DataFrame): DataFrame = {
df.filter("c3 == 'facebook'").select("c3", "c4")
}
def getQueryPlanKey(df: DataFrame): LogicalPlan = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean "Key" here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed getOriginalQueryPlan; used "Key" because the plan before transformation is one of hash keys for IndexLogEntryTags

Comment on lines +387 to +393
def fileIndex: InMemoryFileIndex =
new InMemoryFileIndex(spark, filesToRead, Map(), None)
val newLocation = if (filesToRead.length == index.content.files.size) {
index.withCachedTag(IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY)(fileIndex)
} else {
index.withCachedTag(plan, IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN)(fileIndex)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the following to make the intention clear, but the current approach is also fine:

        val tagForFileIndex = if (filesToRead.length == index.content.files.size) {
          IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY
        } else {
          IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN
        }
        val newLocation = index.withCachedTag(plan, tagForFileIndex) {
          new InMemoryFileIndex(spark, filesToRead, Map(), None)
        }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's because INDEX_ONLY doesn't require plan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed that. thanks

}

private def equalsRef(a: Set[FileIndex], b: Set[FileIndex]): Boolean = {
a.zip(b).forall(f => f._1 eq f._2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need a.size == b.size &&?

@imback82 imback82 merged commit cd9a632 into microsoft:master Jan 29, 2021
@imback82 imback82 added the enhancement New feature or request label Jan 29, 2021
@imback82 imback82 added this to the January 2021 milestone Jan 29, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Cache InMemoryFileIndex for Index
2 participants