# Hyperspace for Delta Lake

[Hyperspace](https://github.com/microsoft/hyperspace) now supports Delta Lake as its data source. This notebook covers how Hyperspace works with Delta Lake tables and updates on the tables.


### Setup configurations

In [None]:
val sessionId = scala.util.Random.nextInt(1000000)
val dataPath = s"/hyperspace/data-$sessionId";
val indexLocation = s"/hyperspace/indexes-$sessionId"

// Use a random index location to avoid conflicts while using the notebook.
spark.conf.set("spark.hyperspace.system.path", indexLocation)
// Use HTML as a display mode.
spark.conf.set("spark.hyperspace.explain.displayMode", "html")
// Enable Hybrid scan regardless of the amount of data being appended/deleted.
spark.conf.set("spark.hyperspace.index.hybridscan.maxAppendedRatio", "0.99") // default: 0.3
spark.conf.set("spark.hyperspace.index.hybridscan.maxDeletedRatio", "0.99") // default: 0.2

### Data preparation

In [None]:
import spark.implicits._
import org.apache.spark.sql.DataFrame

// Sample department records
val departments = Seq(
  (10, "Accounting", "New York"),
  (20, "Research", "Dallas"),
  (30, "Sales", "Chicago"),
  (40, "Operations", "Boston"))

// Sample employee records
val employees = Seq(
  (7369, "SMITH", 20),
  (7499, "ALLEN", 30),
  (7521, "WARD", 30),
  (7566, "JONES", 20),
  (7698, "BLAKE", 30),
  (7782, "CLARK", 10),
  (7788, "SCOTT", 20),
  (7839, "KING", 10),
  (7844, "TURNER", 30),
  (7876, "ADAMS", 20),
  (7900, "JAMES", 30),
  (7934, "MILLER", 10),
  (7902, "FORD", 20),
  (7654, "MARTIN", 30))

val empData = employees.toDF("empId", "empName", "deptId")
val deptData = departments.toDF("deptId", "deptName", "location")
val empLocation = s"$dataPath/employees"
val deptLocation = s"$dataPath/departments"
empData.write.format("delta").mode("overwrite").save(empLocation)
deptData.write.format("delta").mode("overwrite").save(deptLocation)

In [None]:
val empDF = spark.read.format("delta").load(empLocation)
val deptDF = spark.read.format("delta").load(deptLocation)

// Disable BroadcastHashJoin so that Spark™ will use SortMergeJoin that Hyperspace indexes can optimize.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

val eqJoin =
  empDF.
  join(deptDF, empDF("deptId") === deptDF("deptId")).
  select(empDF("empName"), deptDF("deptName"))

eqJoin.show

### Create Hyperspace indexes over Delta Lake tables

Hyperspace supports Delta Lake through an extensible data source builder framework.
In order to create and apply Hyperspace indexes on Delta Lake tables, you need to register Delta Lake source builder.

spark.conf.set("spark.hyperspace.index.sources.fileBasedBuilders", 
  "**com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder**,com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder")


In [None]:
import com.microsoft.hyperspace._
import com.microsoft.hyperspace.index._

// Register delta table source builder.
spark.conf.set(
  "spark.hyperspace.index.sources.fileBasedBuilders",
  "com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder," +
    "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder")

// Enable source lineage to support the scenario with deleted files.
spark.conf.set("spark.hyperspace.index.lineage.enabled", "true")

val hyperspace = Hyperspace()

val empIndexConfig = IndexConfig("empIndex", Seq("deptId"), Seq("empName"))
val deptIndexConfig = IndexConfig("deptIndex", Seq("deptId"), Seq("deptName"))

hyperspace.createIndex(empDF, empIndexConfig)
hyperspace.createIndex(deptDF, deptIndexConfig)

In [None]:
// Enable Hypperspace to apply indexes.
// For simplicity, FilterIndexRule is disabled in this demo.
spark.enableHyperspace()

In [None]:
// Scenario: check if newly created indexes are applied.
val eqJoin =
  empDF.
  join(deptDF, empDF("deptId") === deptDF("deptId")).
  select(empDF("empName"), deptDF("deptName"))

eqJoin.show

hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }

### Append data

With Hybrid Scan, you can still utilize Hyperspace indexes after appending data to the table.


In [None]:
// Add new employees.
val newEmployees = Seq(
  (8000, "NEW-EMPLOYEE-1", 30),
  (8001, "NEW-EMPLOYEE-2", 10),
  (8002, "NEW-EMPLOYEE-3", 20),
  (8003, "NEW-EMPLOYEE-4", 30))

newEmployees.toDF("empId", "empName", "deptId").write.format("delta").mode("append").save(empLocation)

val latestEmpDF = spark.read.format("delta").load(empLocation)
latestEmpDF.show

In [None]:
// Scneario: Hybrid scan is off.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "false")

val eqJoin =
  latestEmpDF.
  join(deptDF, latestEmpDF("deptId") === deptDF("deptId")).
  select(latestEmpDF("empName"), deptDF("deptName"))

eqJoin.show

hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }

In [None]:
// Scenario: Hybrid Scan is on.
spark.conf.set("spark.hyperspace.index.hybridscan.enabled", "true")

val eqJoin =
  latestEmpDF.
  join(deptDF, latestEmpDF("deptId") === deptDF("deptId")).
  select(latestEmpDF("empName"), deptDF("deptName"))

hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }

eqJoin.show

### Incremental refresh

Other than using Hybrid Scan, you can also incrementally build Hyperspace indexes only for appended and deleted data.

In [None]:
// Incrementally build index on new employees only.
hyperspace.refreshIndex("empIndex", "incremental")

In [None]:
// Show refreshed index only contains new data.
spark.read.parquet(s"$indexLocation/empIndex/v__=1").show

In [None]:
// Scenario: Check if refreshed index is applied.
val eqJoin =
  latestEmpDF.
  join(deptDF, latestEmpDF("deptId") === deptDF("deptId")).
  select(latestEmpDF("empName"), deptDF("deptName"))

hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }

eqJoin.show

### Update data

Updated data to the table can be handled as deleted and appended data by using Hybrid Scan or Incremental refresh.

In [None]:
import io.delta.tables._
import org.apache.spark.sql.functions._

val empDeltaTable = DeltaTable.forPath(spark, empLocation)

// Append "SPEICAL" to the "NEW-EMPLOYEE-2"'s name.
empDeltaTable.update(
   col("empName") === ("NEW-EMPLOYEE-2"),
   Map("empName" -> (concat(col("empName"), lit("-SPECIAL")))))

empDeltaTable.history.show(truncate = false)
empDeltaTable.toDF.show(truncate = false)

In [None]:
// Scneario: handle updated data.
val updatedEmpDF = empDeltaTable.toDF
val eqJoin =
  updatedEmpDF.
  join(deptDF, updatedEmpDF("deptId") === deptDF("deptId")).
  select(updatedEmpDF("empName"), deptDF("deptName"))

eqJoin.show(truncate = false)

hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }

### Enhancement of Delta Lake time travel query

For a time travel query with an old table version, the latest version of the index can be used with Hybrid Scan, but usually there could be many appended and/or deleted files which reduce the benefit of indexes.
To optimize it, Hyperspace tracks the history of the index version and table version for each refresh time and selects the closest index version based on the history.


Note that this feature is not available in the current Hyperspace version and will be delivered in the next release.

In [None]:
// Scenario: Time travel to initial version of employees.
val oldEmpOnlyDF = spark.read.format("delta").option("versionAsOf", 0).load(empLocation)

val eqJoin =
  oldEmpOnlyDF.
  join(deptDF, oldEmpOnlyDF("deptId") === deptDF("deptId")).
  select(oldEmpOnlyDF("empName"), deptDF("deptName"))

hyperspace.explain(eqJoin, verbose = true) { displayHTML(_) }

eqJoin.show