Skip to content

Aggregation using Algebird Aggregators

MansurAshraf edited this page Dec 4, 2014 · 24 revisions

#Aggregators

Aggregators enable creation of reusable and compassable aggregation functions. There are three main functions on Aggregator trait.

trait Aggregator[-A, B, +C]  { 
  /**
   * Transform the input before the reduction.
   */
  def prepare(input: A): B
   /**
   * Combine two values to produce a new value.
   */
  def reduce(l: B, r: B): B 
  /**
   * Transform the output of the reduction.
   */
  def present(reduction: B): C
}

##Examples

In this section we will use the data below to show SQL aggregate functions and how to build similar aggregate functions in Scalding.

OrderID OrderDate OrderPrice OrderQuantity CustomerName
1 12/22/2005 160 2 Smith
2 08/10/2005 190 2 Johnson
3 07/13/2005 500 5 Baldwin
4 07/15/2005 420 2 Smith
5 12/22/2005 1000 4 Wood
6 10/2/2005 820 4 Smith
7 11/03/2005 2000 2 Baldwin
 case class Order(orderId: Int, orderDate: String, orderPrice: Long, orderQuantity: Long, 
                  customerName: String)

 val orders = List(
    Order(1, "12/22/2005", 160, 2, "Smith"),
    Order(2, "08/10/2005", 190, 2, "Johnson"),
    Order(3, "07/13/2005", 500, 5, "Baldwin"),
    Order(4, "07/15/2005", 420, 2, "Smith"),
    Order(5, "12/22/2005", 1000, 4, "Wood"),
    Order(6, "10/2/2005", 820, 4, "Smith"),
    Order(7, "11/03/2005", 2000, 2, "Baldwin"))

Count

The SQL COUNT function returns the number of rows in a table satisfying the criteria specified in the WHERE clause.

SQL:
SELECT COUNT (*) FROM Orders
WHERE CustomerName = 'Smith'
//Scalding:
TypedPipe.from(orders)
      .aggregate(count(_.customerName == "Smith"))

If you don’t specify a WHERE clause when using COUNT, your statement will simply return the total number of rows in the table

SQL:
SELECT COUNT(*) FROM Orders
//Scalding:
TypedPipe.from(orders)
      .aggregate(size)

You can also use aggregate functions with Group By.

SQL:
Select CustomerName, Count(CustomerName)
From Orders
Group by CustomerName 
//Scalding:
TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(size)

Output:
(Baldwin,2)
(Johnson,1)
(Smith,3)
(Wood,1)

Sum

The SQL SUM function is used to return the sum of an expression in a SELECT statement

SQL:
SELECT SUM(OrderQuantity) 
FROM Orders
GROUP BY CustomerName
//Scalding:
TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(sumBy(_.orderQuantity))

Output:
(Baldwin,7)
(Johnson,2)
(Smith,8)
(Wood,4)

Max

The SQL MAX function retrieves the maximum numeric value from a column.

SQL:
SELECT CustomerName, MAX(OrderQuantity) 
FROM Order
GROUP By CustomerName
//Scalding:
 val max = maxBy[Order, Long](_.orderQuantity)
      .andThenPresent(_.orderQuantity)

 TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(max)

Output:
(Baldwin,5)
(Johnson,2)
(Smith,4)
(Wood,4)

Min

The SQL MIN function selects the smallest number from a column.

SQL:
SELECT CustomerName, MIN(OrderQuantity) 
FROM Order
GROUP By CustomerName
//Scalding:
 val min = minBy[Order, Long](_.orderQuantity)
      .andThenPresent(_.orderQuantity)

 TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(min)

Distinct

The SQL DISTINCT function selects distinct values from a column.

SQL:
SELECT DISTINCT CustomerName 
FROM Order
//Scalding:
val unique = HyperLogLogAggregator
      .sizeAggregator(4)
      .composePrepare[Order](_.customerName.getBytes("UTF-8"))
    
TypedPipe.from(orders)
      .aggregate(unique)

Output:
3.0

Composing Aggregators

Aggregators can be composed to perform multiple aggregation in one pass.

  val max = maxBy[Order, Long](_.orderQuantity)
  val min = minBy[Order, Long](_.orderPrice)
  val combinedMetric = max.join(min)

  TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(combinedMetric)

Output:
(Baldwin,(5,500))
(Johnson,(2,190))
(Smith,(4,160))
(Wood,(4,1000))

composition can also be used to combine two or more aggregators to derive a new aggregate function.

 val sumAggregator = sumBy[Order, Long](_.orderQuantity)
 val sizeAggregator = size
 val avg = sumAggregator.join(sizeAggregator)
            .andThenPresent{ case (sum, count) => sum.toDouble / count.toDouble }

 TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(avg)

Output:
(Baldwin,3.5)
(Johnson,2.0)
(Smith,2.66)
(Wood,4.0)

Additional Aggregators

Max, Min, Sum, Count, Mean, Standard Deviation in one pass

   val max = maxBy[Order, Long](_.orderPrice)
    val min = minBy[Order, Long](_.orderPrice)
    val sum = sumBy[Order, Long](_.orderPrice)
    val moments = Moments.aggregator.composePrepare[Order](_.orderPrice.toDouble)

    val multiAggregator = GeneratedTupleAggregator
      .from4(max, min, sum, moments)
      .andThenPresent {
        case (mmax, mmin, ssum, moment) => 
              (mmax.orderPrice, mmin.orderPrice, ssum, moment.count, moment.mean, moment.stddev)
      }

    TypedPipe.from(orders)
        .groupBy(_.customerName)
        .aggregate(multiAggregator)

Output:
(Baldwin,(2000,500,2500,2,1250.0,750.0))
(Johnson,(190,190,190,1,190.0,0.0))
(Smith,(820,160,1400,3,466.66,271.46))
(Wood,(1000,1000,1000,1,1000.0,0.0))

Top K

 val topK = new PriorityQueueToListAggregator[Long](2)
             .composePrepare[Order](_.orderQuantity)

 TypedPipe.from(orders)
      .groupBy(_.customerName)
      .aggregate(topK)

Output:
(Baldwin,List(2, 5))
(Johnson,List(2))
(Smith,List(2, 2))
(Wood,List(4))

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

Clone this wiki locally