Skip to content

Commit

Permalink
[SPARK-26861][SQL] deprecate typed sum/count/average
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

These builtin typed aggregate functions are not very useful:
1. users can just call the untyped ones and turn the resulting dataframe to a dataset. It has better performance.
2. the typed aggregate functions have subtle different behaviors regarding empty input.

I think we should get rid of these builtin typed agg functions and suggest users to use the untyped ones.

However, these functions are still useful as a demo of the `Aggregator` API, so I copied them to the example module.

## How was this patch tested?

N/A

Closes apache#23763 from cloud-fan/example.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
cloud-fan authored and jackylee-ch committed Feb 18, 2019
1 parent 1b2ffaf commit e6f6ea1
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 18 deletions.
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.sql

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator

// scalastyle:off println
object SimpleTypedAggregator {

def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local")
.appName("common typed aggregator implementations")
.getOrCreate()

import spark.implicits._
val ds = spark.range(20).select(('id % 3).as("key"), 'id).as[(Long, Long)]
println("input data:")
ds.show()

println("running typed sum:")
ds.groupByKey(_._1).agg(new TypedSum[(Long, Long)](_._2).toColumn).show()

println("running typed count:")
ds.groupByKey(_._1).agg(new TypedCount[(Long, Long)](_._2).toColumn).show()

println("running typed average:")
ds.groupByKey(_._1).agg(new TypedAverage[(Long, Long)](_._2.toDouble).toColumn).show()

spark.stop()
}
}
// scalastyle:on println

class TypedSum[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
override def zero: Long = 0L
override def reduce(b: Long, a: IN): Long = b + f(a)
override def merge(b1: Long, b2: Long): Long = b1 + b2
override def finish(reduction: Long): Long = reduction

override def bufferEncoder: Encoder[Long] = Encoders.scalaLong
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}

class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
override def zero: Long = 0
override def reduce(b: Long, a: IN): Long = {
if (f(a) == null) b else b + 1
}
override def merge(b1: Long, b2: Long): Long = b1 + b2
override def finish(reduction: Long): Long = reduction

override def bufferEncoder: Encoder[Long] = Encoders.scalaLong
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}

class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
override def zero: (Double, Long) = (0.0, 0L)
override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = {
(b1._1 + b2._1, b1._2 + b2._2)
}

override def bufferEncoder: Encoder[(Double, Long)] = {
Encoders.tuple(Encoders.scalaDouble, Encoders.scalaLong)
}
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.expressions.javalang;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.execution.aggregate.TypedAverage;
Expand All @@ -33,9 +31,9 @@
* Scala users should use {@link org.apache.spark.sql.expressions.scalalang.typed}.
*
* @since 2.0.0
* @deprecated As of release 3.0.0, please use the untyped builtin aggregate functions.
*/
@Experimental
@Evolving
@Deprecated
public class typed {
// Note: make sure to keep in sync with typed.scala

Expand Down
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.expressions.scalalang

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.aggregate._

Expand All @@ -29,8 +28,7 @@ import org.apache.spark.sql.execution.aggregate._
*
* @since 2.0.0
*/
@Experimental
@Evolving
@deprecated("please use untyped builtin aggregate functions.", "3.0.0")
// scalastyle:off
object typed {
// scalastyle:on
Expand Down Expand Up @@ -76,15 +74,4 @@ object typed {
* @since 2.0.0
*/
def sumLong[IN](f: IN => Long): TypedColumn[IN, Long] = new TypedSumLong[IN](f).toColumn

// TODO:
// stddevOf: Double
// varianceOf: Double
// approx_count_distinct: Long

// minOf: T
// maxOf: T

// firstOf: T
// lastOf: T
}

0 comments on commit e6f6ea1

Please sign in to comment.