### I0u19a - Data Processing - KU Leuven
![license](https://licensebuttons.net/l/by/3.0/88x31.png)

# **Data processing in Python - the functional approach**

Hello and welcome to the tutorial on data processing using functional programming (FP) tools!

We'll be using [Jupyter](http://jupyter.org/) (you're looking at it) as a tool to walk you through a few examples. Notebooks are an excellent teaching tool as they allow you to experiment with code and data as you work your way through the document.

A few guidelines on the notebook itself:
* A notebook consists of *cells*, which are snippets of either text (markdown) or code (Python in this case).
* Cells can be executed by clicking the `[>]` "play" button, or by hitting shift-enter on the keyboard.
* You can navigate between cells either by clicking or by using the arrow buttons.

### **Objectives**

Many parallel or distributed data processing libraries or frameworks adopt concepts from FP. This notebook provides examples and exercises to familiarize students with functional programming (FP) concepts. We will be using two libraries to illustrate the concepts: [Toolz](http://toolz.readthedocs.io/en/latest/) and [Dask](https://dask.pydata.org/en/latest/). 

> **Careful!** The examples here are intended as didactic material to teach FP concepts, they are not necessarily idiomatic Python nor the best approach for every use case.

---

* import the necessary Python libraries

In [2]:
import json
import toolz as tz
import dask.bag as db

from operator import add

---

## **1. FP basics**

When programming in a functional style, functions are the main abstraction for composing programs. They can be regarded as the **vocabulary** in which to express computational ideas.

* functions transform data
* functions are reusable
* functions are composable
* functions can accept functions as parameters
* functions can return other functions

### **1.0 Function are objects**

* functions are (simple) objects
    * immutable
    * 1 required method to make it Callable:
        * Python: 
        
        `__cal__(self, ..)`
        
        * Scala: 
        
        `apply()`

In [3]:
class Adder():
    
    def __init__(self, how_much=1):
        self.n = how_much
    
    def __call__(self, x):
        return self.n + x

In [4]:
list(map(Adder(), [1, 2, 3]))

[2, 3, 4]

In [5]:
list(map(Adder(5), [1, 2, 3]))

[6, 7, 8]

### **1.1 Functions as arguments**
* like objects, functions can be passed to other functions as arguments

In [6]:
def do_twice(my_function, arg):
    """
    Applies my_function twice to arg and returns the result.
    """
    
    once  = my_function(arg)
    twice = my_function(once)
    
    return twice

In [7]:
def inc(x):
    """
    :param x: integer to increment
    :return: incremented integer
    """
    return x + 1

* we pass the function __`inc`__ as the first argument to __`do_twice`__

In [8]:
do_twice(inc, 5)

7

* we can define a function _inline_ using a **`lambda`** expression

In [9]:
do_twice(lambda x: x + 1, 5)

7

* it works with `Callable` objects

In [10]:
do_twice(Adder(5), 3)

13

### Exercise 1: **append log messages**
* in this exercise, replace the logger that prints to the console with a logger that appends log messages to a collection

In [11]:
def log_print(message):
    print(message)

In [12]:
def log_all(logger, messages):
    for msg in messages:
        logger(msg)

In [13]:
logged_messages = []

def log_append():
    pass # FIX THIS FUNCTION AND PASS TO log_all

log_all(log_print, ['hey', 'ho', "let's", 'go'])

assert logged_messages == ['hey', 'ho', "let's", 'go'], "FIX ME: replace 'log_print' by the correct logging function."

hey
ho
let's
go


AssertionError: FIX ME: replace 'log_print' by the correct logging function.

In [14]:
# exercise 1 solution

logged_messages = []

def log_append(msg):
    logged_messages.append(msg)

log_all(log_append, ['hey', 'ho', "let's", 'go'])

assert logged_messages == ['hey', 'ho', "let's", 'go'], "FIX ME: replace 'log_print' by the correct logging function."

### **1.2 Functions as return values**

* functions can be returned as the resulting value of another function

In [15]:
def do_twice_v2(my_function):    
    # function defined within the scope of another function
    def inner(x):
        return my_function(my_function(x))
    
    return inner  # return the inner function

def do_twice_v3(my_function):    
    # same as above, but using a lambda expression
    return lambda x: my_function(my_function(x))

In [16]:
twice_inc = do_twice_v2(inc)

twice_inc(7)

In [17]:
do_twice_v3(inc)(7)

9

### Exercise 2: **make an unsafe function safe**
* In this exercise, you wrap a function with another function that checks for illegal input (None)

In [18]:
def loud(s):
    return s.upper() + "!"

loud('hello')

'HELLO!'

In [19]:
try:
    loud(None)
except AttributeError as e:
    print('Oops, we gave loud an input that makes it throw an error: ' + str(e))

Oops, we gave loud an input that makes it throw an error: 'NoneType' object has no attribute 'upper'


In [20]:
def make_safe(fn, fallback='silence'):
    return fn # FIX THIS FUNCTION

try:
    safe_loud = make_safe(loud)
    
    loud_result = safe_loud(None) 
    
    assert loud_result == 'SILENCE!', "FIX ME: we expect N/A as fallback result, result was: '{}'.".format(loud_result)
except AttributeError as e:
    raise ValueError("FIX ME: wrap `loud` with a function that makes it safe to call with None.")

ValueError: FIX ME: wrap `loud` with a function that makes it safe to call with None.

In [21]:
# exercise 2 solution

def make_safe(fn, fallback='silence'):
    def inner(s):
        if s:
            return fn(s)
        else:
            return fn(fallback)
        
    return inner
        

try:
    safe_loud = make_safe(loud)
    
    loud_result = safe_loud(None) 
    
    assert loud_result == 'SILENCE!', "FIX ME: we expect N/A as fallback result, result was: '{}'.".format(loud_result)
except AttributeError as e:
    raise ValueError("FIX ME: wrap `loud` with a function that makes it safe to call with None.")

**NOTE**

> functions that take or produce functions are called: **higher-order functions**

### **1.3 Partial function application (a.k.a. partial evaluation)**

* partial application creates a new function by filling in a subset of a functions arguments, using __`partial`__
* see: https://toolz.readthedocs.io/en/latest/curry.html#partial-evaluation

In [22]:
inc?

[0;31mSignature:[0m [0minc[0m[0;34m([0m[0mx[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m
:param x: integer to increment
:return: incremented integer
[0;31mFile:[0m      ~/work/kuleuven/projects/fun_py/notebooks/<ipython-input-7-7eb5cebf16e9>
[0;31mType:[0m      function


In [23]:
twice_inc = tz.partial(do_twice, inc)

In [24]:
twice_inc(3)

5

### **1.4 Function currying**

* curried functions automates partial application
* see: https://toolz.readthedocs.io/en/latest/curry.html#id1

In [25]:
@tz.curry
def do_twice_curried(fn, x):
    return fn(fn(x))

* still works as expected

In [26]:
do_twice_curried(inc, 8)

10

* if not all arguments are provided, a partially applied function is returned

In [27]:
twice_inc = do_twice_curried(inc)

In [28]:
twice_inc(4)

6

### **1.5 Function composition**
* careful: order of functions in a composition is reversed

In [29]:
def is_even(i):
    return i % 2 == 0
    
is_even_length = tz.compose(is_even, len)

In [30]:
is_even_length('nope!')

False

In [31]:
is_even_length('hello, world')

True

* Advanced: `thread_last`
* push a value through a list of functions

In [32]:
tz.thread_last('hello, world',
               len,
               inc,
               (add, 5))

18

### **1.6 Purity**
* see https://toolz.readthedocs.io/en/latest/purity.html

### **1.7 Laziness**

* see https://toolz.readthedocs.io/en/latest/laziness.html

---

## **2. Functions operating on collections**

In a data processing context, **functions** and **collections** go hand in hand.

* functions to transform collections
* functions to aggregate or summarize collections
* map, filter, reduce are the workhorses, the most common higher-order functions that operate on collections

In [33]:
scientists = [{'first': 'Richard', 'last': 'Feynman',  'gender': 'M'}, 
              {'first': 'Marie',   'last': 'Curie',    'gender': 'F'},
              {'first': 'Paul',    'last': 'Stamets',  'gender': 'M'},
              {'first': 'Ada',     'last': 'Lovelace', 'gender': 'F'},
              {'first': 'Stephen', 'last': 'Hawking',  'gender': 'M'},
              {'first': 'Carolyn', 'last': 'Porco',    'gender': 'F'}]

### **2.1 Mapping**

* we pass length as the to `map` as the function to apply to all entries in the collection 
* the length of the output is equal to the length of the input

Let's check out the function signature:

```
map(func, *iterables) --> map object
```

In [34]:
tz.map?

[0;31mInit signature:[0m [0mtz[0m[0;34m.[0m[0mmap[0m[0;34m([0m[0mself[0m[0;34m,[0m [0;34m/[0m[0;34m,[0m [0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
map(func, *iterables) --> map object

Make an iterator that computes the function using arguments from
each of the iterables.  Stops when the shortest iterable is exhausted.
[0;31mType:[0m           type


In [35]:
def last_name(scientist):
    return scientist['last']

name_lengths = tz.map(tz.compose(len, last_name), scientists)

* mapping is lazy
* the result is not immediately computed

In [36]:
name_lengths

<map at 0x7fa766c73198>

* we turn the map into a collection by wrapping it into a list

In [37]:
list(name_lengths)

[7, 5, 7, 8, 7, 5]

**NOTE:**

* we can map over multiple collections at once
* we need to pass in a function with a different signature

In [38]:
sums = tz.map(lambda x, y: x + y, [0, 1, 2, 3], [10, 11, 12, 13])

In [39]:
list(sums)

[10, 12, 14, 16]

### Exercise 3: **shout names**
* use **partial evaluation** and **function composition** to construct a function that returns loud first names from a list of scientists
* no need to bother with input validation

In [40]:
def first_name(scientist):
    pass # FIX ME

shout_first_names = # tz.partial(...FIX ME

shouted_first_names = list(shout_first_names(scientists))

assert shouted_first_names == ['RICHARD!', 'MARIE!', 'PAUL!', 'ADA!', 'STEPHEN!', 'CAROLYN!'], "FIX ME"

SyntaxError: invalid syntax (<ipython-input-40-086f9e6b1366>, line 4)

In [41]:
# exercise 3 solution

def first_name(scientist):
    return scientist['first']

shout_first_names = tz.partial(tz.map, tz.compose(loud, first_name))

shouted_first_names = list(shout_first_names(scientists))

assert shouted_first_names == ['RICHARD!', 'MARIE!', 'PAUL!', 'ADA!', 'STEPHEN!', 'CAROLYN!'], "FIX ME"

### **2.2 Filtering**

* we pass a **predicate**, i.e. a function that takes an entry and returns a Boolean value:

```
f: A -> Boolean
```

In [42]:
ladies_only = tz.filter(lambda s: s['gender'] == 'F', scientists)

list(ladies_only)

[{'first': 'Marie', 'gender': 'F', 'last': 'Curie'},
 {'first': 'Ada', 'gender': 'F', 'last': 'Lovelace'},
 {'first': 'Carolyn', 'gender': 'F', 'last': 'Porco'}]

### Exercise 4: **filter out short first names**

* return the first names shorter than 5 characters
* style points for using `thread_last`, and `partial`

In [43]:
short_first_names = # FIX ME map and filter

SyntaxError: invalid syntax (<ipython-input-43-bdc801d09788>, line 1)

In [44]:
# exercise 4 solution

short_first_names = tz.thread_last(scientists,
                                   tz.partial(tz.map, lambda s: s['first']),
                                   tz.partial(tz.filter, lambda s: len(s) < 5))

In [45]:
assert set(short_first_names) == {'Paul', 'Ada'}

### **2.3 Reducing**

* a **reduction** collapses the collection

Let's check the function signature:

```
reduce(function, sequence[, initial]) -> value
```
* the reduce function signature:

```
f: Accumulator -> Element -> Accumulator
```

* often the `Accumulator` and `Element` types are equal, e.g. summing numeric values
* the result is a single value
* optional argument: `initial`, e.g. the "zero" value for the function operation, e.g.:
    * 0 for addition
    * 1 for multiplication
    * ...

In [46]:
tz.reduce?

[0;31mDocstring:[0m
reduce(function, sequence[, initial]) -> value

Apply a function of two arguments cumulatively to the items of a sequence,
from left to right, so as to reduce the sequence to a single value.
For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates
((((1+2)+3)+4)+5).  If initial is present, it is placed before the items
of the sequence in the calculation, and serves as a default when the
sequence is empty.
[0;31mType:[0m      builtin_function_or_method


In [47]:
sum_numbers = tz.reduce(lambda x, y: x + y, range(1, 5))

sum_numbers

10

* or written more elegantly using the `add` operator

In [48]:
tz.reduce(add, range(1, 5))

10

* the `Accumulator` (int) and `Element` (dict) have different signatures
* we need to provide an initial value for the accumulator

In [49]:
sum_first_name_lengths = tz.reduce(lambda acc, s: acc + len(s['first']), scientists, 0)

assert sum_first_name_lengths == 33

### Exercise 5: **reducing into a dict of counts**
* count the number of scientists by gender by reducing into a dictionary
* hint: useful functions for updating dictionaries: http://toolz.readthedocs.io/en/latest/api.html#dicttoolz 
* eg: `update_in`

In [50]:
my_dict = {'foo': 1}

In [51]:
tz.update_in(my_dict, ['foo'], inc, default=0)

{'foo': 2}

In [52]:
tz.update_in(my_dict, ['bar'], inc, default=0)

{'bar': 1, 'foo': 1}

**NOTE:**

* `update_in` creates new dictionary instances!
* the input is not mutated

In [53]:
my_dict

{'foo': 1}

In [54]:
def exercise5():
    init_accumulator = # FIX ME

    def inc_gender(acc, sci):
        return # FIX ME
    
    nr_scientists_by_gender = tz.reduce(inc_gender, scientists, init_accumulator)
    
    return nr_scientists_by_gender

assert exercise5() == {'M': 3, 'F': 3}, "FIX THIS"

SyntaxError: invalid syntax (<ipython-input-54-20b8c2d1662e>, line 2)

In [56]:
# exercise 5 solution

def exercise5():
    init_accumulator = {}

    def inc_gender(acc, scientist):
        return tz.update_in(acc, keys=[scientist['gender']], func=inc, default=0)
    
    nr_scientists_by_gender = tz.reduce(inc_gender, scientists, init_accumulator)
    
    return nr_scientists_by_gender

assert exercise5() == {'M': 3, 'F': 3}, "FIX THIS"

### Excercise 5b: **same as exercise 4 but using `reduceby`**

* hint: the accumulator is not map but an integer

In [57]:
# exercise 4 b solution

def exercise5b():

    init_accumulator # FIX ME
    
    def binop(acc, scientist):
        return # FIX ME

    return tz.reduceby('gender', binop, scientists, init_accumulator)
    
assert exercise5b() == {'M': 3, 'F': 3}, "FIX THIS"

NameError: name 'init_accumulator' is not defined

In [59]:
# exercise 5b solution

def exercise5b():
    
    init_accumulator = 0

    return tz.reduceby('gender', lambda acc, s: acc+1, scientists, init_accumulator)
    
assert exercise5b() == {'M': 3, 'F': 3}, "FIX THIS"

## **2. Parallel computations using Dask**

In the following exercises, we apply the functional concepts explored in the previous exercises to computations that run in parallel. 

We will use the [Bag](https://dask.pydata.org/en/latest/bag.html) data structure from the [Dask](https://dask.pydata.org/en/latest/docs.html) library to implement our solutions. Dask Bags were inspired by Spark's Resilient Distributed Datasets (RDD), and have a very similar API. They are a convenient way to teach a programming style that is common across multiple "big data" libraries without the burden of setting up complex infrastructure (eg. a Spark cluster).

* let's create a Dask Bag with 8 partitions (parallel subsets)

In [64]:
bag_of_ints = db.from_sequence(list(range(0, 1000)), npartitions=4)

In [65]:
bag_of_ints.take(3)

(0, 1, 2)

In [66]:
sum_by_even = bag_of_ints.foldby(key=lambda x: x % 2 == 0, binop=lambda x,y: x+y)

In [67]:
list(sum_by_even)

[(False, 250000), (True, 249500)]

## **3. Dos cervezas, por favor!**

* following exercises use a dataset parsed from a `json` file
* we'll be using a **Dask bag** to compute some answers

In [68]:
def is_valid(raw_dict):
    return raw_dict['Percentagealcohol'] != 'NA'

def parse_beer(raw_dict):        
    return {'brewery': raw_dict['Brouwerij'],
            'brand': raw_dict['Merk'],
            'type': raw_dict['Soort'],
            'alcohol_pct': float(raw_dict['Percentagealcohol'])}

beers_unfiltered = db.from_sequence(json.load(open('../data/beers.json')), npartitions=4)

* count the number of invalid beer entries (these have a missing value for 'NA')
* Bags are lazy data structures: not all operations (like `count`) immediately return a result! 
* Call `.compute()` to obtain the actual result

In [69]:
beers_unfiltered.filter(predicate=lambda x: not is_valid(x)).count().compute()

12

* let's now get rid of invalid entries and parse them into a more convenient representation

In [70]:
beers = beers_unfiltered.filter(is_valid).map(parse_beer).persist()

* quiz: why have we used the `persist` function above?

In [71]:
beers.take(2)

({'alcohol_pct': 6.0,
  'brand': '3 Schténg',
  'brewery': "Brasserie Grain d'Orge",
  'type': 'hoge gisting'},
 {'alcohol_pct': 5.6,
  'brand': '400',
  'brewery': "'t Hofbrouwerijke voor Brouwerij Montaigu",
  'type': 'blond'})

---

### Exercise 5: **find the number of breweries**

* based on brewery name 
* take into account upper or lower case!
* hint: to get unique results, use `distinct`

In [72]:
nr_breweries = # FIX ME

SyntaxError: invalid syntax (<ipython-input-72-5fc699dd5abb>, line 1)

In [73]:
# exercise 5 solution
nr_breweries = beers.map(lambda b: b['brewery'].upper()).distinct().count().compute()

In [74]:
assert nr_breweries == 350

### Exercise 6: **find the strongest beer**
* use a reduction
* hint: the dask bag function for reducing is called: `fold`

In [75]:
strongest_beer = beers.fold # FIX ME

In [76]:
# exercise 6 solution

strongest_beer = beers.fold(lambda b1, b2: b1 if b1['alcohol_pct'] > b2['alcohol_pct'] else b2).compute()

In [77]:
assert strongest_beer['alcohol_pct'] == 26.0

### Exercise 7: **find the 3 most common alcohol percentages**

* use `frequencies`
* ordering is a very inefficient operation if we are only interested in the top 3
* hint: use the `topk` function

In [78]:
beers.map # FIX ME

<bound method Bag.map of dask.bag<map-par..., npartitions=4>>

In [80]:
# exercise 7 solution

top_3_common_percentages = \
    beers \
        .map(lambda x: x['alcohol_pct']) \
        .frequencies() \
        .topk(3, lambda x: x[1]) \
        .map(lambda x: x[0]) \
        .compute()

In [81]:
assert top_3_common_percentages == [8.0, 6.5, 5.0]

### Exercise 8: **find the brewery that makes the strongest beer on average**

* a nice trick to avoid naming collisions is to wrap functions in an outer function
* aggregations by key are done with `foldby`
* remember why you need `binop` and `combine` functions!

In [82]:
# exercise 8 solution

def find_bad_ass_brewery(beers):
    
    def binop(sum_count, beer):
        sum_, count_ = sum_count # unpack tuple
        return (sum_ + beer['alcohol_pct'], count_ + 1)
    
    def combine(sum_count_1, sum_count_2):
        sum_1, count_1 = sum_count_1
        sum_2, count_2 = sum_count_2
        return (sum_1 + sum_2, count_1 + count_2)
    
    def to_percentage(pair):
        brewery, sum_count = pair
        sum_, count_ = sum_count
        percentage = sum_ / count_
        
        return brewery, percentage
    
    return beers \
            .foldby('brewery', binop=binop, initial=(0,0), combine=combine) \
            .map(to_percentage) \
            .topk(1, key=lambda x: x[1]) \
            .compute()

In [83]:
bad_ass_brewery, strongest_beer_pct = find_bad_ass_brewery(beers)[0]

In [84]:
assert bad_ass_brewery == 'Staminee De Garre (Brouwerij Van Steenberge)'

### Exercise 9: **find the top 3 breweries with the most diverse range of beer types**

* think of which initial value to provide to the `binop`
* your result should also provide the beer types per brewery

In [85]:
# exercise 9 solution

def find_diverse_breweries(beers, top=3):
    
    def binop(set_of_types, beer):
        if set_of_types:
            result = set_of_types
        else:
            result = set()
            
        result.add(beer['type'])
        return result
    
    def combine(set_1, set_2):
        return set.union(set_1, set_2)
    
    def count_types(brewery_set_of_types):
        brewery, set_of_types = brewery_set_of_types
        return len(set_of_types)
    
    return beers \
            .foldby('brewery', binop=binop, initial=None, combine=combine) \
            .topk(top, key=count_types) \
            .compute()

In [86]:
diverse_brewery, beers = find_diverse_breweries(beers, top=1)[0]

In [87]:
assert diverse_brewery == 'Brouwerij Huyghe'