# Aggregations

Aggregations are important and should be as simple as possible to implement. Some notable data aggregation are provided by pandas, spark and SQL language.

Characteristic of a good aggregation design.
 - easily perform aggregation on a column or a set of columns
 - easily perform multiple aggregation functions
 - selectively perform differently aggregations on different columns

As an nice to have to this it would be nice to apply aggregation functions by passing the function name as a string. A good aggregation method should allow all the about together, with the minimal amount of code required.

The code here below attempt to produce readable code, engine agnostic for the aggregation method. As in the previous explanation this should only happen via three methods .cols, .rows .data. In particular, the aggregation api is alwasy in the form of `df.cols.get(...).groupby(...).agg(...)` or use `find` instead of `get`


## Getting started

Let's start spark using datafaucet.

In [23]:
import datafaucet as dfc

In [24]:
# let's start the engine
dfc.engine('spark')

<datafaucet.spark.engine.SparkEngine at 0x7fbdb66f2128>

In [25]:
# expose the engine context
spark  = dfc.context()

## Generating Data

In [70]:
df = spark.range(100)

In [97]:
df = (df
    .cols.create('g').randint(0,3)
    .cols.create('n').randchoice(['Stacy', 'Sandra'])
    .cols.create('x').randint(0,100)
    .cols.create('y').randint(0,100)
)

In [98]:
df.data.grid(5)

Unnamed: 0,id,g,n,x,y
0,0,1,Sandra,91,89
1,1,0,Sandra,19,57
2,2,2,Sandra,34,97
3,3,1,Stacy,35,15
4,4,2,Sandra,93,90


## Pandas
Let's start by lloking how Pandas does aggregations. Pandas is quite flexible on the points noted above and uses hierachical indexes on both columns and rows to store the aggregation names and the groupby values. Here below a simple aggregation and a more complex one with groupby and multiple aggregation functions.

In [99]:
pf = df.data.collect()

In [100]:
pf[['n', 'x', 'y']].agg(['max'])

Unnamed: 0,n,x,y
max,Stacy,97,98


In [103]:
agg = (pf[['g','n', 'x', 'y']]
           .groupby(['g', 'n'])
           .agg({
               'n': 'count',
               'x': ['min', max],
               'y':['min', 'max']
           }))
agg

Unnamed: 0_level_0,Unnamed: 1_level_0,n,x,x,y,y
Unnamed: 0_level_1,Unnamed: 1_level_1,count,min,max,min,max
g,n,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
0,Sandra,9,14,75,3,98
0,Stacy,21,10,96,8,92
1,Sandra,20,8,91,9,91
1,Stacy,18,2,89,4,97
2,Sandra,12,4,97,1,98
2,Stacy,20,4,96,0,98


### Stacking 
In pandas, you can stack the multiple column index and move it to a column, as below. The choice of stacking or not after aggregation depends on wht you want to do later with the data. Next to the extra index, stacking also explicitely code NaN / Nulls for evry aggregation which is not shared by each column (in case of dict of aggregation functions.

In [139]:
agg = pf[['g', 'x', 'y']].groupby(['g']).agg(['min', 'max', 'mean'])
agg = agg.stack(0)
agg

Unnamed: 0_level_0,Unnamed: 1_level_0,max,mean,min
g,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,x,96,50.966667,10
0,y,98,47.133333,3
1,x,91,45.026316,2
1,y,97,48.736842,4
2,x,97,58.75,4
2,y,98,53.90625,0


### Index as columns
Index in pandas is not the same as column data, but you can easily move from one to the other, as shown below, by combine the name information of the various index levels with the values of each level.

In [140]:
agg.index.names

FrozenList(['g', None])

In [141]:
# for example these are the value from the first level of the index
agg.index.get_level_values(0)

Int64Index([0, 0, 1, 1, 2, 2], dtype='int64', name='g')

The following script will iterate through all the levels and create a column with the name of the original index level otherwise will use `_<level#>` if no name is available. Remember that pandas allows indexes to be nameless.

In [142]:
levels = agg.index.names
for (name, lvl) in zip(levels, range(len(levels))):
    agg[name or f'_{lvl}'] = agg.index.get_level_values(lvl)

In [143]:
#now the index is standard columns, drop the index
agg.reset_index(inplace=True, drop=True)
agg

Unnamed: 0,max,mean,min,g,_1
0,96,50.966667,10,0,x
1,98,47.133333,3,0,y
2,91,45.026316,2,1,x
3,97,48.736842,4,1,y
4,97,58.75,4,2,x
5,98,53.90625,0,2,y


## Spark (Python)
Spark aggregation is a bit simpler, but definitely very flexible, so we can achieve the same result with a little more work in some cases. Here below a simple example and a more complex one, reproducing the same three cases as above.

In [165]:
df.select('n', 'x', 'y').agg({'n':'max', 'x':'max', 'y':'max'}).toPandas()

Unnamed: 0,max(x),max(y),max(n)
0,97,98,Stacy


Or with a little more work we can exactly reproduce the pandas case:

In [166]:
from pyspark.sql import functions as F

df.select('n', 'x', 'y').agg(
    F.lit('max').alias('_idx'),
    F.max('n').alias('n'), 
    F.max('x').alias('x'), 
    F.max('y').alias('y')).toPandas()

Unnamed: 0,_idx,n,x,y
0,max,Stacy,97,98


More complicated aggregation cannot be called by string and must be provided by functions. Here below a way to reproduce groupby aggregation as in the second pandas example:

In [168]:
(df
    .select('g', 'n', 'x', 'y')
    .groupby('g', 'n')
    .agg(
        F.count('n').alias('n_count'),
        F.min('x').alias('x_min'),
        F.max('x').alias('x_max'),
        F.min('y').alias('y_min'),
        F.max('y').alias('y_max')
    )
).toPandas()
        

Unnamed: 0,g,n,n_count,x_min,x_max,y_min,y_max
0,0,Sandra,10,17,96,8,98
1,0,Stacy,20,10,92,3,86
2,1,Stacy,18,4,89,4,97
3,2,Sandra,14,29,96,1,98
4,1,Sandra,20,2,91,4,97
5,2,Stacy,18,4,97,0,96


### Stacking

Stacking, as in pandas, can be used to expose the column name on a different index column, unfortunatel stack is currently available only in the SQL initerface and not very flexible as in the pandas counterpart (https://spark.apache.org/docs/2.3.0/api/sql/#stack)

You could use pyspark `expr` to call the SQL function as explained here (https://stackoverflow.com/questions/42465568/unpivot-in-spark-sql-pyspark). However, another way would be to union the various results as shown here below.

In [None]:
agg = pf[['g', 'x', 'y']].groupby(['g']).agg(['min', 'max', 'mean'])
a

In [176]:
from pyspark.sql import functions as F

(df
    .select('g', 'x')
    .groupby('g')
    .agg(
        F.lit('x').alias('_idx'),
        F.min('x').alias('min'),
        F.max('x').alias('max'),
        F.mean('x').alias('mean')
    )
).union(
df
    .select('g', 'y')
    .groupby('g')
    .agg(
        F.lit('y').alias('_idx'),
        F.min('y').alias('min'),
        F.max('y').alias('max'),
        F.mean('y').alias('mean')
    )
).toPandas()

Unnamed: 0,g,_idx,min,max,mean
0,1,x,2,91,45.026316
1,2,x,4,97,58.75
2,0,x,10,96,50.966667
3,1,y,4,97,48.736842
4,2,y,0,98,53.90625
5,0,y,3,98,47.133333


### Generatring aggregating code

The code above looks complicated, but is very regular, hence we can generate it! What we need is a to a list of lists for the aggregation functions as shown here below:

In [179]:
dfs = []
for c in ['x','y']:
    print(' '*2, f'col: {c}')
    aggs = []
    for func in [F.min, F.max, F.mean]:
        f = func(c).alias(func.__name__)
        aggs.append(f)
        print(' '*4, f'func: {f}')
        
    dfs.append(df.select('g', c).groupby('g').agg(*aggs))

   col: x
     func: Column<b'min(x) AS `min`'>
     func: Column<b'max(x) AS `max`'>
     func: Column<b'avg(x) AS `mean`'>
   col: y
     func: Column<b'min(y) AS `min`'>
     func: Column<b'max(y) AS `max`'>
     func: Column<b'avg(y) AS `mean`'>


The dataframes in this generator have all the same columns and can be reduced with union calls

In [181]:
from functools import reduce

reduce(lambda a,b: a.union(b), dfs).toPandas()

Unnamed: 0,g,min,max,mean
0,1,2,91,45.026316
1,2,4,97,58.75
2,0,10,96,50.966667
3,1,4,97,48.736842
4,2,0,98,53.90625
5,0,3,98,47.133333


## Meet DataFaucet agg

One of the goal of datafaucet is to simplify analytics, data wrangling and data discovery over a set of engine with an intuitive interface. So the sketched solution above is available, with a few extras. See below the examples

In [183]:
# simple aggregation by name
d = df.cols.get('x').agg('distinct')
d.data.grid()

Unnamed: 0,x
0,64


In [184]:
# simple aggregation (multiple) by name
d = df.cols.get('x').agg(['distinct', 'avg'])
d.data.grid()

Unnamed: 0,x_distinct,x_avg
0,64,51.2


In [185]:
# simple aggregation (multiple) by name (stacked)
d = df.cols.get('x').agg(['distinct', 'avg'], stack=True)
d.data.grid()

Unnamed: 0,_idx,distinct,avg
0,x,64,51.2


In [186]:
# simple aggregation (multiple) by name (stacked, custom index name)
d = df.cols.get('x').agg(['distinct', 'avg'], stack='colname')
d.data.grid()

Unnamed: 0,colname,distinct,avg
0,x,64,51.2


In [190]:
# simple aggregation (multiple) by name and function
d = df.cols.get('x').agg(['distinct', F.min, F.max, 'avg'])
d.data.grid()

Unnamed: 0,x_distinct,x_min,x_max,x_avg
0,64,2,97,51.2


In [191]:
# multiple aggregation by name and function
d = df.cols.get('x', 'y').agg(['distinct', F.min, F.max, 'avg'])
d.data.grid()

Unnamed: 0,x_distinct,x_min,x_max,x_avg,y_distinct,y_min,y_max,y_avg
0,64,2,97,51.2,67,0,98,49.91


In [193]:
# multiple aggregation (multiple) by name and function
d = df.cols.get('x', 'y').agg({
    'x':['distinct', F.min], 
    'y':['distinct', 'max']})

d.data.grid()

Unnamed: 0,x_distinct,x_min,x_max,y_distinct,y_min,y_max
0,64,2,,67,,98


In [194]:
# multiple aggregation (multiple) by name and function (stacked)
d = df.cols.get('x', 'y').agg({
    'x':['distinct', F.min], 
    'y':['distinct', 'max']}, stack=True)
d.data.grid()

Unnamed: 0,_idx,distinct,min,max
0,x,64,2.0,
1,y,67,,98.0


In [195]:
# grouped by, multiple aggregation (multiple) by name and function (stacked)
d = df.cols.get('x', 'y').groupby('g','n').agg({
    'x':['distinct', F.min], 
    'y':['distinct', 'max']}, stack=True)
d.data.grid()

### Extended list of aggregation

An extended list of aggregation is available, both by name and by function in the datafaucet library

In [210]:
from datafaucet.spark import aggregations as A

d = df.cols.get('x', 'y').groupby('g','n').agg([
        'type',
        ('uniq', A.distinct),
        'one',
        'top3',
    ], stack=True)

d.data.grid()

Unnamed: 0,g,n,_idx,type,uniq,one,top3
0,1,Stacy,x,int,23,67,"{32: 2, 25: 2, 39: 2}"
1,0,Stacy,x,int,16,74,"{70: 1, 74: 1, 19: 1}"
2,2,Sandra,x,int,10,40,"{4: 2, 97: 2, 69: 1}"
3,1,Sandra,x,int,13,52,"{56: 1, 8: 1, 2: 1}"
4,0,Sandra,x,int,13,79,"{36: 2, 89: 1, 35: 1}"
5,2,Stacy,x,int,20,45,"{61: 1, 34: 2, 70: 1}"
6,2,Stacy,y,int,13,98,"{30: 2, 66: 2, 35: 2}"
7,0,Stacy,y,int,13,57,"{36: 1, 57: 1, 25: 1}"
8,1,Sandra,y,int,16,79,"{97: 2, 82: 2, 15: 3}"
9,2,Sandra,y,int,14,40,"{1: 1, 98: 1, 7: 1}"
