Skip to content

Commit

Permalink
Merge pull request #5 from treeverse/feature/rename-table-operator
Browse files Browse the repository at this point in the history
Rename table operator
  • Loading branch information
arielshaqed committed Aug 21, 2023
2 parents 6db9bea + b533555 commit f4824f5
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ Run `sbt package`, then add

### Schema diff

`schema_diff` is a Spark SQL table-valued function. The expression
`refs_data_diff` is a Spark SQL table-valued function. The expression

```sql
schema_diff(PREFIX, FROM_SCHEMA, TO_SCHEMA, TABLE)
refs_data_diff(PREFIX, FROM_SCHEMA, TO_SCHEMA, TABLE)
```

yields a relation that compares the "from" table `PREFIX.FROM_SCHEMA.TABLE`
Expand All @@ -46,7 +46,7 @@ with the "to" table `PREFIX.TO_SCHEMA.TABLE`. Elements of "to" but not
For instance,

```sql
SELECT lakefs_change, Player, COUNT(*) FROM schema_diff('lakefs', 'main~', 'main', 'db.allstar_games')
SELECT lakefs_change, Player, COUNT(*) FROM refs_data_diff('lakefs', 'main~', 'main', 'db.allstar_games')
GROUP BY lakefs_change, Player;
```

Expand All @@ -58,5 +58,5 @@ you can set up a view with it:

```sql
CREATE TEMPORARY VIEW diff_allstar_games_main_last_commit AS
schema_diff('lakefs', 'main~', 'main', 'db.allstar_games');
refs_data_diff('lakefs', 'main~', 'main', 'db.allstar_games');
```
12 changes: 6 additions & 6 deletions src/main/scala/io/lakefs/iceberg/extension/Extension.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.StringLiteral

// A table-valued function to compute the difference between the same table
// at two schemas.
object SchemaDiff {
object TableDataDiff {
private def computeString(e: Expression): String = {
val literalValue = StringLiteral.unapply(e)
literalValue match {
Expand Down Expand Up @@ -53,15 +53,15 @@ object SchemaDiff {
spark.sql(sqlString).queryExecution.logical
}

val function = (FunctionIdentifier("schema_diff"),
new ExpressionInfo("io.lakefs.iceberg.extension.SchemaDiff$",
"", "schema_diff", "schema_diff('TABLE_PREFIX', 'FROM_SCHEMA', 'TO_SCHEMA', 'TABLE_SUFFIX')",
"schema_diff('TABLE_PREFIX', 'FROM_SCHEMA', 'TO_SCHEMA', 'TABLE_SUFFIX')"),
val function = (FunctionIdentifier("refs_data_diff"),
new ExpressionInfo("io.lakefs.iceberg.extension.TableDataDiff$",
"", "refs_data_diff", "refs_data_diff('TABLE_PREFIX', 'FROM_SCHEMA', 'TO_SCHEMA', 'TABLE_SUFFIX')",
"refs_data_diff('TABLE_PREFIX', 'FROM_SCHEMA', 'TO_SCHEMA', 'TABLE_SUFFIX')"),
tdfBuilder _)
}

class LakeFSSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectTableFunction(SchemaDiff.function)
extensions.injectTableFunction(TableDataDiff.function)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ExtensionSpec extends AnyFunSpec
val df2 = Seq(("a", 1), ("xyzzy", 2), ("c", 3), ("d", 4)).toDF
df2.writeTo("spark_catalog.second.table").create()

val diff = spark.sql("SELECT * FROM schema_diff('spark_catalog', 'first', 'second', 'table')")
val diff = spark.sql("SELECT * FROM refs_data_diff('spark_catalog', 'first', 'second', 'table')")
.collect()
.toSet
diff should equal(Set(Row("-", "b", 2), Row("+", "xyzzy", 2), Row("+", "d", 4)))
Expand Down

0 comments on commit f4824f5

Please sign in to comment.