## Advanced Spark - Goldilocks

In [1]:
import pyspark
import numpy as np
import pandas as pd
from pyspark.sql import SQLContext
import itertools
sqlCtx = SQLContext(sc)

We are working with a pandas dataset! 

In [2]:
names = ["Mama Panda", "Papa Panda", "Baby Panda", "Baby Panda's toy Panda"]
happiness = [15.0, 2.0, 10.0, 3.0]
niceness = [0.25, 1000, 2.0, 8.5]
softness = [2467.0, 35.4, 50.0, 0.2]
sweetness = [0.0, 0.0, 0.0, 98.0]

df = pd.DataFrame({"happiness": happiness, "niceness": niceness, "softness": softness, "sweetness": sweetness})
df.insert(0, 'panda_name', names)

print df.head()

df = sqlCtx.createDataFrame(df)

               panda_name  happiness  niceness  softness  sweetness
0              Mama Panda       15.0      0.25    2467.0        0.0
1              Papa Panda        2.0   1000.00      35.4        0.0
2              Baby Panda       10.0      2.00      50.0        0.0
3  Baby Panda's toy Panda        3.0      8.50       0.2       98.0


## Goldilocks Problem
* **input**: arbitrary list of integers n1...nk
* **return**: nth best element in each column

### example
* input: [2, 4]
* return:
    * happiness: [3.0, 15.0]
    * niceness: [2.0, 1000.0]
    * softness: [35.4, 2467.0]
    * sweetness: [0.0, 98.0]

## 1. Naive Solution
* loop through each column, mapping each row to a single value, then use Spark's sortBy with zipWithIndex function on each column

In [3]:
def findRankStatistics(df, ranks):
    assert(all([rank > 0 for rank in ranks])) ## require all ranks to be > 0
    
    numCols = len(df.schema)
    i = 0
    results = {}
    for i in range(numCols):
        col = df.rdd.map(lambda row: row[i])
        sortedCol = col.sortBy(lambda x: x).zipWithIndex()
        ranksOnly = sortedCol.filter(
            lambda (colValue, index): (index + 1) in ranks
        ).keys()
        rankedList = ranksOnly.collect()
        results[i+1] = rankedList
    return results

findRankStatistics(df, [2, 4])

{1: [u"Baby Panda's toy Panda", u'Papa Panda'],
 2: [3.0, 15.0],
 3: [2.0, 1000.0],
 4: [35.4, 2467.0],
 5: [0.0, 98.0]}

This solution works and is relatively robust, but it is very slow since it has to sort the
data once for each column and does so iteratively. In other words, if we have 8,000
columns we have to do 8,000 sorts!

## 2. groupByKey Solution
* One simple solution to the Goldilocks problem is to use groupByKey to group the element in each column. GroupByKey returns an iterator of elements by each key
* After converting the iterator to an array, we can sort the array and filter for the elements that correspond to our rank statistics.

In [4]:
def mapToKeyValuePairs(df):
    rowLength = len(df.schema)
    return df.rdd.flatMap(lambda row: [(i, row[i]) for i in range(0, rowLength)])


def findRankStatistics(df, ranks):
    assert(all([rank > 0 for rank in ranks])) ## require all ranks to be > 0
    
    pairRDD = mapToKeyValuePairs(df)
    groupColumns = pairRDD.groupByKey()
    
    def convertToArrayAndSort(iterable):
        sortedIter = list(iterable)
        sortedIter.sort()
        return [val for i, val in enumerate(sortedIter) if (i+1) in ranks]
        
    ## mapValues doc: http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=mapvalues#pyspark.RDD.flatMapValues
    return groupColumns.mapValues(convertToArrayAndSort).collectAsMap()

findRankStatistics(df, [2, 4])

{0: [u"Baby Panda's toy Panda", u'Papa Panda'],
 1: [3.0, 15.0],
 2: [2.0, 1000.0],
 3: [35.4, 2467.0],
 4: [0.0, 98.0]}

This solution has several advantages. 
* First, it gives the correct answer. 
* Second, it is very short and easy to understand. It leverages out-of-the-box Spark and Scala functions and so it introduces few edge cases and is relatively easy to test. On small data, particularly if the input data has many columns but few records, it is actually relatively efficient because it only requires one shuffle in the groupByKey step and because the sorting step can be computed as a narrow transformation on the executors.

### Why GroupByKey fails
* If you have read Learning Spark or spent much time working with Spark at scale, the results of the groupByKey approach to solving the Goldilocks problem shouldn’t surprise you as groupByKey is known to cause memory errors at scale. The reason is that the “groups” created by groupByKey are always iterators, which can’t be distributed.
* This causes an expensive “shuffled read” step in which Spark has to read all of the shuffled data from disk and into memory.

## 3. Secondary Sort Solution

In order to mitigate the problem, instead of just sorting based on the column index, we can do a secondary sort. 

The function has four steps:
1. Define a custom partitioner that partitions records according to the first element of the key.
2. Define an implicit ordering on the values. This is only necessary because the function is generic. The implicit ordering on tuples is first value, second value. We just have to tell Spark to use that tuple ordering.
3. Use repartitionAndSortWithinPartitions on the input RDD with the custom partitioner defined in step 1.
4. Coalesce the items using a mapPartitions routine. We can leverage the fact that items with the same first key are on the same partition and that the elements within each partition are sorted first by the first ordering and then by the second ordering.

In [5]:
## partitioner
def ColumnIndexPartition(numPartitions):
    assert(numPartitions >= 0)
    """Partition by the first item in the key tuple"""
    def getPartition(x):
        return abs(x[0]) % numPartitions
    return getPartition

In [6]:
def findRankStatistics(df, targetRanks, partitions):
    assert(all([rank > 0 for rank in targetRanks])) ## require all ranks to be > 0

    pairRDD = mapToKeyValuePairs(df).map(lambda x: (x, 1))
    partitioner = ColumnIndexPartition(partitions)
    sorted = pairRDD.repartitionAndSortWithinPartitions(partitions, partitioner)
    
    def filterForTargetRanksFn(iterable):
        currentColumnIndex = [-1] ## made these objects to be accessed in the filterFn without python scope problems
        runningTotal = [0]
        def filterFn(x):
            ((colIndex, value), _) = x
            if colIndex != currentColumnIndex[0]:
                currentColumnIndex[0] = colIndex ## reset to the new column index
                runningTotal[0] = 1
            else:
                runningTotal[0] += 1
            return runningTotal[0] in targetRanks
        return map(lambda x: x[0], filter(filterFn, iterable))
    
    filterForTargetIndex = sorted.mapPartitions(filterForTargetRanksFn, preservesPartitioning=True)
    results = filterForTargetIndex.collect()
    
    resultsMap = {} ## just need to group them now
    for i, val in results:
        if i not in resultsMap: 
            resultsMap[i] = []
        resultsMap[i].append(val)

    return resultsMap


findRankStatistics(df, [2, 4], 3)

{0: [u"Baby Panda's toy Panda", u'Papa Panda'],
 1: [3.0, 15.0],
 2: [2.0, 1000.0],
 3: [35.4, 2467.0],
 4: [0.0, 98.0]}

## 4. Sort on Cell Values Solution 

Our solution is still not perfect.If the columns are relatively long, the repartitionAndSortWithinPartitions step may still lead to failures since it still requires one executor to be able to store all of the values associated with all of the columns that have the same hash value.

1. Map the rows of data to pairs of (cell value, index).
2. Perform a sortByKey operation on all tuples defined in step 1.
3. Using mapPartitions, determine how many elements in each column are on
each partition and collect that information to the driver.
4. Perform a local computation on the result of step 3 to determine the location of
each desired rank statistic. For example, suppose that we are looking for the 13th
element. Suppose also that in step 3 we determined that the first partition had 10
elements from column six. In this case, we can conclude that the 13th element
will be the third largest element in column six on the second partition.
5. Using the result of step 4, use another mapPartitions transformation to filter for
the elements that correspond to the desired rank statistics. Collect this informa‐
tion back to the driver.

In [7]:
def getValueColumnPairs(dataFrame):
    return dataFrame.rdd.flatMap(lambda row: [(val, index) for index, val in enumerate(row)])

In [73]:
def getColumnsFreqPerPartition(sortedValueColumnPairs, numOfColumns):
    zero = [0 for i in range(numOfColumns)]
    def aggregateColumnFrequencies(partitionIndex, valueColumnPairs):
        for val, index in valueColumnPairs:
            zero[index] += 1
        return [(partitionIndex, list(zero))]
    return sortedValueColumnPairs.mapPartitionsWithIndex(aggregateColumnFrequencies).collect()

In [74]:
def getRanksLocationsWithinEachPart(targetRanks, partitionColumnsFreq, numOfColumns):
    runningTotal = [0 for i in range(numOfColumns)]
    
    def partitionColumnsFreqFn((partitionIndex, columnsFreq)):
        relevantIndexList = []
        for colIndex, colCount in enumerate(columnsFreq):
            runningTotalCol = runningTotal[colIndex]
            ranksHere = filter(lambda rank: runningTotalCol < rank and (runningTotalCol + colCount) >= rank, 
                               targetRanks)
            relevantIndexList += map(lambda rank: [colIndex, rank - runningTotalCol], ranksHere)
            runningTotal[colIndex] += colCount
        return [partitionIndex, filter(lambda x: x != [], relevantIndexList)]
    
    return map(partitionColumnsFreqFn, sorted(partitionColumnsFreq, key=lambda x: x[0]))

In [92]:
def findTargetRanksIteratively(sortedValueColumnPairs, ranksLocations):
    def sortedValueColumnPairsFn(partitionIndex, valueColumnPairs):
        targetsInThisPart = ranksLocations[partitionIndex][1]
        if targetsInThisPart:
            columnsRelativeIndex = {}
            for k, g in itertools.groupby(sorted(targetsInThisPart), key=lambda x: x[0]):
                columnsRelativeIndex[k] = [pair[1] for pair in g]
            columnsInThisPart = list(set(map(lambda x: x[0], targetsInThisPart)))
            runningTotals = {}
            for i in columnsInThisPart:
                runningTotals[i] = 0
            
            def valueColumnPairsFn((value, colIndex)):
                if colIndex not in runningTotals:
                    return False
                total = runningTotals[colIndex] + 1
                runningTotals[colIndex] = total
                thisPairIsTheRankStatistic = total in columnsRelativeIndex[colIndex]
                return thisPairIsTheRankStatistic
            
            return map(lambda (x,y): (y,x), filter(valueColumnPairsFn, valueColumnPairs))         
        else:
            return iter([])
        return targetsInThisPart
             
    return sortedValueColumnPairs.mapPartitionsWithIndex(sortedValueColumnPairsFn)    

In [99]:
def findRankStatistics(dataFrame, targetRanks):
    valueColumnPairs = getValueColumnPairs(dataFrame)
    sortedValueColumnPairs = valueColumnPairs.sortByKey()
    sortedValueColumnPairs.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
    numOfColumns = len(dataFrame.schema)
    partitionColumnsFreq = getColumnsFreqPerPartition(sortedValueColumnPairs, numOfColumns)
    ranksLocations = getRanksLocationsWithinEachPart(targetRanks, partitionColumnsFreq, numOfColumns)
    targetRanksValues = findTargetRanksIteratively(sortedValueColumnPairs, ranksLocations)

    results = targetRanksValues.collect()
    resultsMap = {} ## just need to group them now
    for i, val in results:
        if i not in resultsMap: 
            resultsMap[i] = []
        resultsMap[i].append(val)
    return resultsMap
    
findRankStatistics(df, [2, 4])

{0: [u"Baby Panda's toy Panda", u'Papa Panda'],
 1: [3.0, 15.0],
 2: [2.0, 1000.0],
 3: [35.4, 2467.0],
 4: [0.0, 98.0]}

## Goldilocks Postmortem

* Goldilocks Version 1: Iterative Solution. Our first solution iteratively looped through each group and performed a distributed sort, resulting in one stage and one expensive distributed sort per group.

* Goldilocks Version 2: groupByKey Solution. The next solution used groupByKey shuffle records associated with the same group onto the same partition. Then we were able to sort each group in one stage by using mapPartitions to sort the values in each group.

* Goldilocks Version 3: Secondary Sort. Using the secondary sort technique, we improved our groupByKey solution by replacing the groupByKey and sorting steps with the repartitionAndSortWithinPartitions function to push the work of sorting each group into the shuffle stage.

* Goldilocks Version 4: Sort on Cell Values. Next, we realized that it was possible to solve the problem using only one sort on the value of each record, rather than the group. We developed a solution that keyed the records by value (rather than by group/column index), sorted all the records, and then performed a series of narrow transformations to collect the results. We expected the new sorting keys (the values in the columns) to contain fewer duplicates than the the size of each group, which we used as a key in version 3.