Skip to content

Commit

Permalink
Add Aggregate Knowledge HLL implementation
Browse files Browse the repository at this point in the history
along with the ability to choose which implementation is used.

* Choose HLL implementation by:
   - SparkSession config OR
   - explicit argument to functions
* Postgres interoperability tests for Aggregate Knowledge HLLs
   - launches Postgres with the HLL extension installed, in a Docker container
* `hll_convert` for converting sketches from StreamLib to Aggregate Knowledge

Also:
* Add code style & apply
* Move release process info to DEVELOPMENT file
  • Loading branch information
pidge committed Oct 16, 2019
1 parent 24bee59 commit 98a3ead
Show file tree
Hide file tree
Showing 18 changed files with 949 additions and 274 deletions.
Binary file added .DS_Store
Binary file not shown.
5 changes: 5 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ jobs:

docker:
- image: swoopinc/spark-alchemy:adoptopenjdk-8u222-alpine-circleci-201909251709
- image: swoopinc/postgres-hll:11

working_directory: ~/spark-alchemy

Expand All @@ -34,6 +35,10 @@ jobs:
keys:
- v1-dependencies-{{ checksum "build.sbt" }}

- run:
name: Wait for postgres
command: dockerize -wait tcp://localhost:5432 -timeout 1m

# "cat /dev/null |" prevents sbt from running in interactive mode. One of many amazing
# hacks get sbt working in a sane manner.
- run:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@ project/plugins/project/
#Markdown editing
.Ulysses-favorites.plist

#IntelliJ
.idea

metastore_db/
tmp/
13 changes: 11 additions & 2 deletions dev/release-process.md → DEVELOPMENT.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
# Release Process
# Development

## Local tests

To run the `PostgresInteropTest` you need to have a working Docker
environment. On Mac, that means having Docker Desktop installed and
running. Then run `docker-compose up` in the repository root to start a
Postgres server.

## Release Process

1. Develop new code on feature branches.

Expand All @@ -12,7 +21,7 @@
* Publish the microsite to Github Pages
* Create a new release on the [Github Project Release Page](https://github.com/swoop-inc/spark-alchemy/releases)

## Project Version Numbers
### Project Version Numbers

* The `VERSION` file in the root of the project contains the version number that SBT will use for the `spark-alchemy` project.
* The format should follow [Semantic Versioning](https://semver.org/) with the patch number matching the Travis CI build number when deploying new releases.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.4.0-SNAPSHOT
0.5.0-SNAPSHOT
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ trait NativeFunctionRegistration extends FunctionRegistration {
}

/**
* Creates an [[ExpressionInfo]] for the function as defined by expression T using the given name.
*/
* Creates an [[ExpressionInfo]] for the function as defined by expression T using the given name.
*/
protected def expressionInfo[T <: Expression : ClassTag](name: String): ExpressionInfo = {
val clazz = scala.reflect.classTag[T].runtimeClass
val df = clazz.getAnnotation(classOf[ExpressionDescription])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import org.apache.spark.sql.Column


/** Convenience trait to use HyperLogLog functions with the same error consistently.
* Spark's own [[sql.functions.approx_count_distinct()]] as well as the granular HLL
* [[HLLFunctions.hll_init()]] and [[HLLFunctions.hll_init_collection()]] will be
* automatically parameterized by [[BoundHLL.hllError]].
*/
* Spark's own [[sql.functions.approx_count_distinct()]] as well as the granular HLL
* [[HLLFunctions.hll_init()]] and [[HLLFunctions.hll_init_collection()]] will be
* automatically parameterized by [[BoundHLL.hllError]].
*/
trait BoundHLL extends Serializable {

def hllError: Double

def functions: HLLFunctions

def approx_count_distinct(col: Column): Column =
sql.functions.approx_count_distinct(col, hllError)

Expand Down Expand Up @@ -42,11 +44,16 @@ trait BoundHLL extends Serializable {

def hll_init_collection_agg(columnName: String): Column =
functions.hll_init_collection_agg(columnName, hllError)

}

object BoundHLL {
def apply(error: Double): BoundHLL = new BoundHLL {
/**
* @param error maximum estimation error allowed
* @param impl only affects the hll_* functions, not Spark's built-ins
*/
def apply(error: Double)(implicit impl: Implementation = null): BoundHLL = new BoundHLL {
def hllError: Double = error

val functions = HLLFunctions.withImpl(impl)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
* Hash function for Spark data values that is suitable for cardinality counting. Unlike Spark's built-in hashing,
* it differentiates between different data types and accounts for nulls.
*/
* Hash function for Spark data values that is suitable for cardinality counting. Unlike Spark's built-in hashing,
* it differentiates between different data types and accounts for nulls.
*/
abstract class CardinalityHashFunction extends InterpretedHashFunction {

override def hash(value: Any, dataType: DataType, seed: Long): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object HLLFunctionRegistration extends NativeFunctionRegistration {
expression[HyperLogLogMerge]("hll_merge"),
expression[HyperLogLogRowMerge]("hll_row_merge"),
expression[HyperLogLogCardinality]("hll_cardinality"),
expression[HyperLogLogIntersectionCardinality]("hll_intersect_cardinality")
expression[HyperLogLogIntersectionCardinality]("hll_intersect_cardinality"),
expression[HyperLogLogConvert]("hll_convert")
)

}
Loading

0 comments on commit 98a3ead

Please sign in to comment.