Skip to content

Aggregation using Algebird Aggregators

MansurAshraf edited this page Dec 4, 2014 · 24 revisions

#Aggregators

Aggregators enable creation of reusable and compassable aggregation functions. There are three main functions on Aggregator trait.

trait Aggregator[-A, B, +C]  { 
  /**
   * Transform the input before the reduction.
   */
  def prepare(input: A): B
   /**
   * Combine two values to produce a new value.
   */
  def reduce(l: B, r: B): B 
  /**
   * Transform the output of the reduction.
   */
  def present(reduction: B): C
}

##Examples

In this section we will use the data below to show SQL aggregate functions and how to build similar aggregate functions in Scalding.

OrderID OrderDate OrderPrice OrderQuantity CustomerName
1 12/22/2005 160 2 Smith
2 08/10/2005 190 2 Johnson
3 07/13/2005 500 5 Baldwin
4 07/15/2005 420 2 Smith
5 12/22/2005 1000 4 Wood
6 10/2/2005 820 4 Smith
7 11/03/2005 2000 2 Baldwin
 case class Order(orderId: Int, orderDate: String, orderPrice: Long, orderQuantity: Long, 
                  customerName: String)

 val orders = List(
    Order(1, "12/22/2005", 160, 2, "Smith"),
    Order(2, "08/10/2005", 190, 2, "Johnson"),
    Order(3, "07/13/2005", 500, 5, "Baldwin"),
    Order(4, "07/15/2005", 420, 2, "Smith"),
    Order(5, "12/22/2005", 1000, 4, "Wood"),
    Order(6, "10/2/2005", 820, 4, "Smith"),
    Order(7, "11/03/2005", 2000, 2, "Baldwin"))

Count

The SQL COUNT function returns the number of rows in a table satisfying the criteria specified in the WHERE clause.

SQL:
SELECT COUNT (*) FROM Orders
WHERE CustomerName = 'Smith'
//Scalding:
import com.twitter.algebird.Aggregator.count

TypedPipe.from(orders)
      .aggregate(count(_.customerName == "Smith"))

If you don’t specify a WHERE clause when using COUNT, your statement will simply return the total number of rows in the table

SQL:
SELECT COUNT(*) FROM Orders
//Scalding:
import com.twitter.algebird.Aggregator.size

TypedPipe.from(orders)
      .aggregate(size)

You can also use aggregate functions with Group By.

SQL:
Select CustomerName, Count(CustomerName)
From Orders
Group by CustomerName 
//Scalding:
import com.twitter.algebird.Aggregator.size

TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(size)

Output:
(Baldwin,2)
(Johnson,1)
(Smith,3)
(Wood,1)

Sum

The SQL SUM function is used to return the sum of an expression in a SELECT statement

SQL:
SELECT SUM(OrderQuantity) 
FROM Orders
GROUP BY CustomerName
//Scalding:
import com.twitter.algebird.Aggregator.sumBy

TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(sumBy(_.orderQuantity))

Output:
(Baldwin,7)
(Johnson,2)
(Smith,8)
(Wood,4)

Max

The SQL MAX function retrieves the maximum numeric value from a column.

SQL:
SELECT CustomerName, MAX(OrderQuantity) 
FROM Order
GROUP By CustomerName
//Scalding:
import com.twitter.algebird.Aggregator.maxBy

 val maxOp = maxBy[Order, Long](_.orderQuantity)
      .andThenPresent(_.orderQuantity)

 TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(maxOp)

Output:
(Baldwin,5)
(Johnson,2)
(Smith,4)
(Wood,4)

Min

The SQL MIN function selects the smallest number from a column.

SQL:
SELECT CustomerName, MIN(OrderQuantity) 
FROM Order
GROUP By CustomerName
//Scalding:
import com.twitter.algebird.Aggregator.minBy

 val minOp = minBy[Order, Long](_.orderQuantity)
      .andThenPresent(_.orderQuantity)

 TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(minOp)

AVG

The SQL AVG function calculates average value of a numeric column.

SQL:
SELECT CustomerName, AVG(OrderQuantity) 
FROM Order
GROUP BY CustomerName
import com.twitter.algebird._

 val avg = AveragedValue.aggregator.composePrepare[Order](_.orderQuantity)

 TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(avg)

Output:
(Baldwin,3.5)
(Johnson,2.0)
(Smith,2.66)
(Wood,4.0)

Distinct

The SQL DISTINCT function selects distinct values from a column.

SQL:
SELECT DISTINCT CustomerName 
FROM Order
//Scalding:
import com.twitter.algebird.HyperLogLogAggregator

val unique = HyperLogLogAggregator
      .sizeAggregator(4)
      .composePrepare[Order](_.customerName.getBytes("UTF-8"))
    
TypedPipe.from(orders)
      .aggregate(unique)

Output:
3.0

Top K

 import com.twitter.algebird.mutable.PriorityQueueToListAggregator

 val topK = new PriorityQueueToListAggregator[Long](2)
             .composePrepare[Order](_.orderQuantity)

 TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(topK)

Output:
(Baldwin,List(2, 5))
(Johnson,List(2))
(Smith,List(2, 2))
(Wood,List(4))

Composing Aggregators

Aggregators can be composed to perform multiple aggregation in one pass.

  import com.twitter.algebird.Aggregator._

  val maxOp = maxBy[Order, Long](_.orderQuantity)
  val minOp = minBy[Order, Long](_.orderPrice)
  val combinedMetric = maxOp.join(minOp)

  TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(combinedMetric)

Output:
(Baldwin,(5,500))
(Johnson,(2,190))
(Smith,(4,160))
(Wood,(4,1000))

composition can also be used to combine two or more aggregators to derive a new aggregate function.

 import com.twitter.algebird.Aggregator._

 val sumAggregator = sumBy[Order, Long](_.orderQuantity)
 val sizeAggregator = size
 val avg = sumAggregator.join(sizeAggregator)
            .andThenPresent{ case (sum, count) => sum.toDouble / count.toDouble }

 TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(avg)

Output:
(Baldwin,3.5)
(Johnson,2.0)
(Smith,2.66)
(Wood,4.0)

you can join up to 22 aggregators by using GeneratedTupleAggregator. Example below show calculating Max, Min, Sum, Count, Mean and Standard Deviation in one pass by joining different aggregators.

   import com.twitter.algebird.Aggregator._
   import com.twitter.algebird.{GeneratedTupleAggregator, MomentsAggregator, Moments }

   val maxOp = maxBy[Order, Long](_.orderPrice)
   val minOp = minBy[Order, Long](_.orderPrice)
   val sum = sumBy[Order, Long](_.orderPrice)
   val moments = Moments.aggregator.composePrepare[Order](_.orderPrice.toDouble)

   val multiAggregator = GeneratedTupleAggregator
      .from4(maxOp, minOp, sum, moments)
      .andThenPresent {
        case (mmax, mmin, ssum, moment) => 
              (mmax.orderPrice, mmin.orderPrice, ssum, moment.count, moment.mean, moment.stddev)
      }

    TypedPipe.from(orders)
        .groupBy(_.customerName)
        .aggregate(multiAggregator)

Output:
(Baldwin,(2000,500,2500,2,1250.0,750.0))
(Johnson,(190,190,190,1,190.0,0.0))
(Smith,(820,160,1400,3,466.66,271.46))
(Wood,(1000,1000,1000,1,1000.0,0.0))

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

Clone this wiki locally