# Main-Memory Aggregation
A core primitive in data systems is aggregation. For example, we may want to compute the mean value of a set of numbers. Or, we may want to understand if two groups of users have different engagement rates with our mobile app. These types of operations cannot be answered by the select, project, and join, operators described in the last lecture. Aggregation is substantially more complicated to implement, and despite its prevalence, is often a core systems bottleneck.

## An Aggregation Function
An aggregation function is a function $F$ that takes in an iterator of tuples (of the same schema of course!) and returns a single atomic value. Examples include `sum(set_of_numbers)`, `max(set_of_numbers)`, `concat(set_of_strings)`. Why is aggregation so difficult? Let's look at the common structure of the examples above. We've described the code to perform the aggregation in a specific way:

In [1]:
lst = [1,2,3,4]
s = 0
for i in lst:
    s = sum((s,i))
print("Sum:", s)


lst = [1,2,3,4]
m = 0
for i in lst:
    m = max(m,i)
print("Max:", s)


lst = ['a','b','c','d']
s = ''
for i in lst:
    s = ''.join((s,i))
print("Concat:", s)

Sum 10
Max 10
Concat abcd


For these aggregates a single number "summarizes" the state of the aggregation. The aggregation can proceed value by value and fold each new value into the running aggregate. However, we may not get so lucky. Let's see what happens when we do this to compute a mean:

In [3]:
#WRONG!!!!!
lst = [1,2,3,4]
m = 0
for i in lst:
    m = 0.5*m + 0.5*i
print('Result:', m, 'Actual:', 2.5)

Result: 3.0625 Actual: 2.5


A mean value is not incrementable in the same way. One way to think about it is that it requires more persistent state to be incrementable. For example, if we tracked both the sum and the count:

In [4]:
#Correct!!!!!
lst = [1,2,3,4]
s,c = 0,0
for i in lst:
    s = sum((s,i))
    c = sum((c,1))
    
print('Result:', s/c, 'Actual:', 2.5)

Result: 2.5 Actual: 2.5


The basic structure of an aggregator looks as follows:

In [None]:
state = [] #initialize state
for i in lst:
    #update state
    pass

There is no guarantee that this state is "small". Some aggregates essentially require representing the whole set as its aggregation state. 

# Aggregate By Key
We're often interested in comparing different subsets of data. For example, computing the average salary for each branch. Or, computing the treatment effect for a control group v.s. an experimental group. This process is called aggregation by key. For example, if we have a list of key, value pairs:

In [5]:
lst = [('A',1),('B',2),('A',3),('B', 4)]

We might want to calculate the total sum for each key. This means that *we will have to maintain the state of each key's aggregates*.

In [9]:
lst = [('A',1),('B',2),('A',3),('B', 4)]

state = {} # a HashMap from keys to running sums 

for key, value in lst:
    state[key] = sum((state.get(key,0), value))
    
print("Sum:", state)

Sum: {'A': 4, 'B': 6}


Therefore, aggregation requires memory on the order O(|state| * |keys|). This means that even if the aggregate has a very small state (e.g., like a sum), the number of keys might be very large. We will formally address this problem in the next class but let's build some intuition on why we can side-step this problem for the special case when the state is small but the number of keys are large.

Consider the following code with aggregates over a single key at a time:

In [11]:
lst = [('A',1),('B',2),('A',3),('B', 4)]

state = {} # a HashMap from keys to running sums 

for key, value in lst:
    if key == 'A':
        state[key] = sum((state.get(key,0), value))

print("Sum:", state)
state = {}
        
for key, value in lst:
    if key == 'B':
        state[key] = sum((state.get(key,0), value))
        
print("Sum:", state)

Sum: {'A': 4}
Sum: {'B': 6}


So, if we had an iterator over all of the distinct keys (you'll do this for homework!), then we could trade-off memory for additional passes over our dataset. We'll study this in detail in your homework and in the next class.

## Aggregation Operator

Now, let's put this all together and build an iterator interface for aggregation. This code structure will be informative as it will show us why aggregations are so difficult

In [36]:
class Sum:
    
    def __init__(self, inp, key_attr, value_attr):
        self.in1  = inp
        self.key = key_attr
        self.value = value_attr
        
    def __iter__(self):        
        #the aggregation goes here!!!
        
        state = {} # a HashMap from keys to running sums 
        for tup in self.in1:
            state[tup[self.key]] = sum((state.get(tup[self.key],0), int(tup[self.value])))
        
        self.it = iter(self.state.items())
        
        return self
    
    def __next__(self):   
        elemk, elemv = next(self.it)
        return {self.key: elemk,self.value: elemv}
        

Let's see how this integrates with `db.py` from the previous lecture:

In [37]:
from db import *

rooms = Load('rooms.csv')
for r in rooms:
    print(r)

{'building': 'SHFE', 'number': '203', 'capacity': '32', 'board': 'black'}
{'building': 'JCL', 'number': '243', 'capacity': '4', 'board': 'white'}
{'building': 'JCL', 'number': '298', 'capacity': '51', 'board': 'black'}
{'building': 'RWLD', 'number': '161', 'capacity': '52', 'board': 'black'}
{'building': 'RY', 'number': '161a', 'capacity': '4', 'board': 'black'}
{'building': 'RY', 'number': '277', 'capacity': '32', 'board': 'white'}
{'building': 'RY', 'number': '276', 'capacity': '42', 'board': 'black'}


In [38]:
for r in Sum(rooms, 'building', 'capacity'):
    print(r)

TypeError: unsupported operand type(s) for +: 'int' and 'str'