# Functional programming - Parallel Processing

Based on realpython course on "[Functional Programming in Python](https://realpython.com/courses/functional-programming-python/)"

Wile Python is not a dedicated language for functional programming, we can apply some fundamentals to make sure our functions have no side effects:

1. Good Practice: Start with a solid data structure: use immutable data types
2. Use the functional programming primitives filter(), map(), functools.reduce()

The goal? To make your code fit for parallel processing. (see Appendix for more background info)

If you don't want to parallelize your probably better off using generator expressions: "Generator expressions are as efficient as map() in terms of memory consumption because both of them return iterators that yield items on demand. However, generator expressions will almost always improve your code’s readability.

**Functional vs. Procedural programming. Why and What?**

<img src="resources/pic_1.jpg" width="50%" />

Check out this from Udacity's Data Engineering Nanodegree [video](https://www.youtube.com/watch?time_continue=99&v=ZTbFxpcvmSk&feature=emb_logo)

For some notebooks on the topic see first nbs in `z_pysparc_folder` in `res_engineering` repo (probably local only)

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Use-immutable-Data-Structures:-(Named)-Tuples" data-toc-modified-id="Use-immutable-Data-Structures:-(Named)-Tuples-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Use immutable Data Structures: (Named) Tuples</a></span></li><li><span><a href="#Functional-Programming-Primitives" data-toc-modified-id="Functional-Programming-Primitives-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Functional Programming Primitives</a></span><ul class="toc-item"><li><span><a href="#filter()" data-toc-modified-id="filter()-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>filter()</a></span></li><li><span><a href="#map()" data-toc-modified-id="map()-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>map()</a></span></li><li><span><a href="#reduce()" data-toc-modified-id="reduce()-2.3"><span class="toc-item-num">2.3&nbsp;&nbsp;</span>reduce()</a></span></li></ul></li><li><span><a href="#Parallel-Processing" data-toc-modified-id="Parallel-Processing-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Parallel Processing</a></span><ul class="toc-item"><li><span><a href="#multiprocessing" data-toc-modified-id="multiprocessing-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>multiprocessing</a></span></li><li><span><a href="#concurrent.futures" data-toc-modified-id="concurrent.futures-3.2"><span class="toc-item-num">3.2&nbsp;&nbsp;</span>concurrent.futures</a></span></li><li><span><a href="#GIL-...-and-when-to-use-which-approach" data-toc-modified-id="GIL-...-and-when-to-use-which-approach-3.3"><span class="toc-item-num">3.3&nbsp;&nbsp;</span>GIL ... and when to use which approach</a></span></li></ul></li></ul></div>

In [1]:
import collections
import shelve
import sys
from pprint import pprint

In [2]:
print(sys.executable)
print(sys.version)

C:\Users\r2d4\miniconda3\envs\py3\python.exe
3.8.3 (default, May 19 2020, 06:50:17) [MSC v.1916 64 bit (AMD64)]


In [3]:
# from csv import reader
# with open('lps_2020-09-25.csv',mode='r') as infile:
#     d = dict(reader(infile, dialect="excel"))

## Use immutable Data Structures: (Named) Tuples

If you want to make your data structure more robust, you can work with (named) tuples instead of dictionaries or dataframes. They:

- are immutable
- make sure the keys are consistent for all instances



In [5]:
# We create a new class `Records` using the collections namedtuple
Record = collections.namedtuple("Record", [
    "quantity",
    "artist",
    "album",
    "genre",
    "preis",
    "monat",
])

print(Record)

<class '__main__.Record'>


In [6]:
# Create new instances
rec_1 = Record(quantity=1, 
               artist="Year Of The Knife",
               album="Ultimate Aggression",
               genre="Hardcore",
               preis=30.0,
               monat="Sep 19"
               )

rec_2 = Record(quantity=1, 
               artist="Undeath",
               album="Lesions Of A Different Kind",
               genre="Death Metal",
               preis=20.0,
               monat="Nov 19"
               )

rec_3 = Record(quantity=1, 
               artist="Outer Heaven",
               album="Realms Of Eternal Decay",
               genre="Death Metal",
               preis=60.0,
               monat="Oct 19"
               )

rec_4 = Record(quantity=1, 
               artist="Ossuary",
               album="Supreme Degradation",
               genre="Death Metal",
               preis=20.0,
               monat="Oct 19"
                )

rec_5 = Record(quantity=1, 
               artist="Internal Rot",
               album="Griefing Birth",
               genre="Death Metal",
               preis=20.0,
               monat="Oct 19"
               )

rec_6 = Record(quantity=1, 
               artist="Disembowel",
               album="Echoes of Terror",
               genre="Death Metal",
               preis=20.0,
               monat="Oct 19"
               )

In [7]:
print(rec_1.artist)
print(rec_2.preis)

Year Of The Knife
20.0


**Note**: If we would collect our instances in a list of Records, then the individual instances are immutable, but the list as such is mutable (e.g. we could delete an instance with `del records[0]` or also new). _Mixing mutable and immutable data structures is dangerous!_

In [8]:
# Make a tuple of Records 
records = (rec_1, rec_2, rec_3, rec_4, rec_5, rec_6)

pprint(records)

(Record(quantity=1, artist='Year Of The Knife', album='Ultimate Aggression', genre='Hardcore', preis=30.0, monat='Sep 19'),
 Record(quantity=1, artist='Undeath', album='Lesions Of A Different Kind', genre='Death Metal', preis=20.0, monat='Nov 19'),
 Record(quantity=1, artist='Outer Heaven', album='Realms Of Eternal Decay', genre='Death Metal', preis=60.0, monat='Oct 19'),
 Record(quantity=1, artist='Ossuary', album='Supreme Degradation', genre='Death Metal', preis=20.0, monat='Oct 19'),
 Record(quantity=1, artist='Internal Rot', album='Griefing Birth', genre='Death Metal', preis=20.0, monat='Oct 19'),
 Record(quantity=1, artist='Disembowel', album='Echoes of Terror', genre='Death Metal', preis=20.0, monat='Oct 19'))


In [20]:
# For later: Make data structure available in multiprocessing scripts
# Note: It's very hard to serialize named tuples ... so we store a tuple of dicts
with shelve.open('data/record_shelf', 'c') as shelf:
    records_export = tuple(dict(Record._asdict()) for Record in records)
    shelf["records"] = records_export

In [22]:
x

({'quantity': 1,
  'artist': 'Year Of The Knife',
  'album': 'Ultimate Aggression',
  'genre': 'Hardcore',
  'preis': 30.0,
  'monat': 'Sep 19'},
 {'quantity': 1,
  'artist': 'Undeath',
  'album': 'Lesions Of A Different Kind',
  'genre': 'Death Metal',
  'preis': 20.0,
  'monat': 'Nov 19'},
 {'quantity': 1,
  'artist': 'Outer Heaven',
  'album': 'Realms Of Eternal Decay',
  'genre': 'Death Metal',
  'preis': 60.0,
  'monat': 'Oct 19'},
 {'quantity': 1,
  'artist': 'Ossuary',
  'album': 'Supreme Degradation',
  'genre': 'Death Metal',
  'preis': 20.0,
  'monat': 'Oct 19'},
 {'quantity': 1,
  'artist': 'Internal Rot',
  'album': 'Griefing Birth',
  'genre': 'Death Metal',
  'preis': 20.0,
  'monat': 'Oct 19'},
 {'quantity': 1,
  'artist': 'Disembowel',
  'album': 'Echoes of Terror',
  'genre': 'Death Metal',
  'preis': 20.0,
  'monat': 'Oct 19'})

## Functional Programming Primitives
### filter()

The built-in filter() takes a function (or None) and an iterable and returns an iterator (filter objects yielding those items from the iterator for which the that function(item) returns True)

In [10]:
# filter() returns a filter object
filter(lambda x: x.monat == "Oct 19", records)

<filter at 0x296f033be80>

In [11]:
# Generator basics: How to iterate over the items one at a time
hc_recs = filter(lambda x: x.monat == "Oct 19", records)
next(hc_recs)

Record(quantity=1, artist='Outer Heaven', album='Realms Of Eternal Decay', genre='Death Metal', preis=60.0, monat='Oct 19')

In [12]:
# Generator basics: How to get them all at once, wrap into list() or tuple() ...
tuple(filter(lambda x: x.monat == "Oct 19", records))

(Record(quantity=1, artist='Outer Heaven', album='Realms Of Eternal Decay', genre='Death Metal', preis=60.0, monat='Oct 19'),
 Record(quantity=1, artist='Ossuary', album='Supreme Degradation', genre='Death Metal', preis=20.0, monat='Oct 19'),
 Record(quantity=1, artist='Internal Rot', album='Griefing Birth', genre='Death Metal', preis=20.0, monat='Oct 19'),
 Record(quantity=1, artist='Disembowel', album='Echoes of Terror', genre='Death Metal', preis=20.0, monat='Oct 19'))

Note: Compared to a classic for loop
- we don't have side effects of printing out and calling functions
- very declarative, no need to spell out for loop and so
- we can easily chain this code (actually it is already a chain made of highly declarative simple, reusable building blocks)

Even better would be to write:
    
```
def oct_filter(x):
    return x.monat == "Oct 19"

tuple(filter(oct_filter, records))
```

But take care, in every day usage list comprehensions (or even better: generator comprehensions) are certainly the more pythonic way to write code - functional programming should be applied with a cause, e.g. parallelization.

### map()

The built-in map() takes a function and iterable(s) and returns an iterator that computes the function using arguments from each of the passed iterables. (Stops when the shortest iterable is exausted.) -> It maps a function on each of the original items.

In [13]:
# Example: Return new collection of named tuples with doubled prices

Doubled = collections.namedtuple("Doubled", ["album", "d_price"])

doubled = tuple(map(
    lambda x: Doubled(album = x.album.upper(), d_price = x.preis * 2),
    records
))

doubled

(Doubled(album='ULTIMATE AGGRESSION', d_price=60.0),
 Doubled(album='LESIONS OF A DIFFERENT KIND', d_price=40.0),
 Doubled(album='REALMS OF ETERNAL DECAY', d_price=120.0),
 Doubled(album='SUPREME DEGRADATION', d_price=40.0),
 Doubled(album='GRIEFING BIRTH', d_price=40.0),
 Doubled(album='ECHOES OF TERROR', d_price=40.0))

Again: The comprehension code is of course much more pythonic and easier to read than using map():

```
tuple(Doubled(album = x.album, d_price = x.preis * 2) for x in records)
```

But to get to the point / mindset of functional programming:
When we frist apply the filter function, then the map function then we have this clearly spelled out series of steps that we can chain together. Also note that we have transformed the 'album' entries, but without touching the original (immutable) data.
We could reuse all these single building blocks and data steps and have no sideeffects what so ever.

### reduce()

Not a built-in! Has to be imported

```
from functools import reduce
```

reduce() applies a function of 2 args cumulatively to the items of a sequence (--> value), so as tho reduce the sequence to a single value (--> accumulator).

Example:
```
redude(lambda x, y: x+y [1, 2, 3]
```
calculates ((1+2)+3)


(Note: For simplicity's sake, no named tuples but (mutable) dicts for the following code examples.)

In [14]:
from functools import reduce

# Very simple example: Calculate the total price
reduce(lambda total_preis, Records:  total_preis + Records.preis, records, 0) # The final arg is the optional initial

170.0

In [15]:
# Better use case: populate an empty dict with genre counts
genre_dict = {"Hardcore": [], "Death Metal": [], "Crossover": []}

def reducer(acc, val):
    acc[val.genre].append(val.album)
    return acc

records_by_genre = reduce(
    reducer,
    records,
    genre_dict
)

records_by_genre

{'Hardcore': ['Ultimate Aggression'],
 'Death Metal': ['Lesions Of A Different Kind',
  'Realms Of Eternal Decay',
  'Supreme Degradation',
  'Griefing Birth',
  'Echoes of Terror'],
 'Crossover': []}

In [16]:
# On a side note: A safer way would be to generate the genre_dict directly form the entries

from collections import defaultdict

records_by_genre = reduce(
    reducer,
    records,
    defaultdict(list)
)

dict(records_by_genre)

{'Hardcore': ['Ultimate Aggression'],
 'Death Metal': ['Lesions Of A Different Kind',
  'Realms Of Eternal Decay',
  'Supreme Degradation',
  'Griefing Birth',
  'Echoes of Terror']}

Again, a more pythonic way _could_ be ... (but now things are not so clear anymore ...):

```
import itertools

records_by_genre = {
    item[0]: list(item[1])
    for item in itertools.groupby(records, lambda x: x.genre)
}
```

 ## Parallel Processing 
### multiprocessing

1. import multiprocessing
2. Create a multiprocessing.Pool() object as interface for parallel processing
3. On windows: install an `if __name__ == '__main__':` guard in the main module to avoid creating subprocesses recursively. (see [here](https://stackoverflow.com/questions/18204782/runtimeerror-on-windows-trying-python-multiprocessing))



In [17]:
# As baseline: Preparing some sequential processing using map()

import time

def double_the_price_slowly(x):
    print("Fetching and processing item ...")
    time.sleep(1)
    result = {"album": x.album, "d_price": x.preis * 2}
    print(f"Finished processing of album {x.album}")
    return result

start = time.time()

_ = tuple(map(double_the_price_slowly, records))

end = time.time()

print(f"\nTotal time: {end-start:.2f}")

Fetching and processing item ...
Finished processing of album Ultimate Aggression
Fetching and processing item ...
Finished processing of album Lesions Of A Different Kind
Fetching and processing item ...
Finished processing of album Realms Of Eternal Decay
Fetching and processing item ...
Finished processing of album Supreme Degradation
Fetching and processing item ...
Finished processing of album Griefing Birth
Fetching and processing item ...
Finished processing of album Echoes of Terror

Total time: 6.03


<b><font color='red'>Multiprocessing code does not run in iPython, see separate multiprocessing.py script for executable code.</span></b><font color='red'>

The followingcode runs in about 2.6 secs on 4 cores:
<br>

```
if __name__ == "__main__":
    multiprocessing.freeze_support()

    start = time.time()

    pool = multiprocessing.Pool()
    _ = pool.map(double_the_price_slowly, records)

    end = time.time()

    print(f"\nTotal time: {end-start:.2f}")
```


Passign the following argument speeds it up to  1.8 secs ...
<br>

```
    pool = multiprocessing.Pool(processes=len(records))
```

(Note: How we check the processes in the script's log with `os.getpid()`.)

### concurrent.futures

The new and shine interface to work with thread_pools and process_pools
Less issues on windows machines.

1. import concurrent.futures
2. pass your function, iterable to an executor.map()

Note: cf will return a generator object, not a list as with the multiprocessing operation from above.

**Process Pooling:**

```
import concurrent.futures

with concurrent.futures.ProcessPoolExecutor() as executor:
    result = exectutor.map(foo, iterable)
```

PCE uses the full number of available CPU cores per default (You can get that with os.cpu_count() btw.)

This is about the equivalent to what happens in multiprocessing with my 4 cores and runs in about the same time.

**Thread Pooling:**

```
with concurrent.futures.ThreadPoolExecutor(max_worers = len(iterable)) as executor:
    result = exectutor.map(foo, iterable)
```

TPE needs a a max_workers argument setting the number of threads in the pool (e.g. min(MAX_W, len(iterable)).

Here it processes all 6 records in my toy dataset in the same process but spread out over 6 threads. It finishes in 1 sec, as fast as can be. (See below why.)



### GIL ... and when to use which approach

First you have to know about Python's "dark secret": _Global Interpreter Lock (GIL)_ - Because of the GIL, no two threads can execute Python code at the same time. So even if you have multiple threads running in your Python program, only one of them can execute at a time. But nonetheless, always when the thread is waiting for I/O (e.g. during the sleep command above), the other threads can execute in the meantime. So: 

- **For GPU-intensive jobs without I/O operations the best way to get around this is to use _process-based parallel programming_ with as many cores as are available.**
- **For I/O-bound jobs ont the other hand, it might be of great advantage to work with much larger batches than the number of cores you have. So try the _tread pooling_ and experiment with the number of workers.**

(Exception: If your code executing is very I/O heavy. Because alsways when the thread is waiting for I/O (e.g. the sleep command above), the other threads can execute in the meantime.)

The cool thing about the concurrent.futures interface is that you can swap the execution strategies very easily. But if you need more leeway to customize your parallelization, especially on how data is passed on, then multiprocessing might have to be used. 

But first: see Fluent Python p. 539ff. for some advance use of concurrent.futures. (One problem with executor.map() is that it returns the results exactly in the same order as the calls are started. This can lead to some blocking.)

---