# Python: Functional Programming for Big Data

## Why Functional Programming?
* Mathematics
    * Transformations
    * Operators
        * Calculus: Integrals, Derivatives
        * Probability: Predicates (over Sets)
        * Linear Algebra: Vectorization
        * Statistics: Expectations, ...
        
        
* Analytics
    * Spark
    * Hadoop

## Prelude: Functional Programming

* functions
* higher-order functions
* combinators


* working with functions
    * using python's object model
    * lifting    
    * partial application
    * currying 


## Functions are Values

In [1]:
from functools import *

In [20]:
f

<function __main__.f(x)>

In [19]:
f(10)

20

In [2]:
def f(x):
    return x * 2


g = f

g(5), f(10)

(10, 20)

In [17]:
def calc(f):
    print(f([1, 2, 3]))
    
calc(sum)

6


* combinators:
    - function that takes a function, and uses it to "recombine" data

In [2]:
# List[String] -> List[Int] via f: String -> Int
#  C[A] -> C[B] via f: A ->B
def _map(f, data):
    return [ f(d) for d in data ]

def double(x):
    return x * 2

In [4]:
print( _map(double, [2, 4, 6]) )
print( _map(str.split, ["Michael Burgess", "Kunal Haria"]) )

[4, 8, 12]
[['Michael', 'Burgess'], ['Kunal', 'Haria']]


In [29]:
# idea: the function produces containers, de-nest that 
# List[String] -> List[String], via .split : String -> List[String]
# C[A] -> C[B] via f: A -> C[B]
def _flatMap(f, data):
    old = _map(f, data)
    new = []
    for e in old :
        for piece in e:
            new.append(piece)
    
    return new

In [None]:
print( _flatMap(str.split, ["Michael Burgess", "Kunal Haria"]) )

In [30]:
# C[O] -> N  via f: (total: N, e: O)
def _reduce(f, start, data):
    total = start
    for e in data:
        total = f(total, e)

    return total

ages = [18, 19]
prices = [1, 2, 3]

print(_reduce(int.__add__,   0, [1, 2, 3]))
print(_reduce(str.__add__,   "", ["Michael", "Kunal"]))
print(_reduce(bool.__or__,   False, [ages[0] == 18, ages[1] > 20]))
print(_reduce(bool.__and__,  True, [ages[0] == 18, ages[1] > 20]))

print(_reduce(int.__mul__, 0, prices))
print(_reduce(int.__mul__, 1, prices))

6
MichaelKunal
True
False
0
6


In [9]:
def _filter(test, data):
    new = []
    for e in data:
        if test(e):
            new.append(e)
            
    return new

6

## Exercise Questions

* Using python's in-built combinators, start with a list of strings, and produce:

    * a list of bools whether there is data or not
    * a list of string lengths
    * a list of string lengths more than 5
    * a list of words in each initial string (.split)
    * a list of words in each initial string where each set of words is > 2

In [23]:
names = ["Michael B", "Here World Is agooads", ""]
# solution to first question 
print(list(map(bool, names)))

# solution to third question
def is_morethan_5(string):
    return len(string) > 5

print(list(filter(is_morethan_5, names)))

[True, True, False]
['Michael B', 'Here World Is agooads']


## Python Tips

* Layout

In [26]:
list(
    map( 
        lambda s: s.upper(),  
        ["Michael", "John", "Burgess"]
    )
)

['MICHAEL', 'JOHN', 'BURGESS']

* Lambda

In [25]:
def f(s):
    return len(s) > 2


list(map(lambda s: len(s) > 2, names))

[True, True, False]

In [21]:
class API:
    map = reduce = filter = lambda s: s
 
eg = API()

( eg
   .map()
   .reduce()
   .filter()
   .map()
   .reduce()
)

<__main__.API at 0x10d7e3860>

In [75]:
(1 + 
1)

2

In [27]:
class Person:
    def say(self, name):
        return name.upper()
    
me = Person()
list(map(me.say, ["Michael", "Kunal"]))

['MICHAEL', 'KUNAL']

## Functional Programming for Mathematics

## Key Elements of Spark's API

### Transformations
* map
* reduce
* filter
* flatMap
* fold

### Actions
* take
* collect
* show

### Uses
* projection maps
* aggregating reduces

### Aproximating Hadoop's MapReduce

* Our hadoop MapReduce query (/ "job")

In [None]:
def mapper(line):
    for word in line.split(" "):
        yield (word.lower(), 1)

def reducer(pair):
    key, value = pair
    yield (key, sum(value))

* Bridging step
    * ie., things needed to simulate hadoop

In [1]:
from functools import reduce

def _combine(data, values):
    for (k, v) in values:
        data[k].append(v)
    return data

* The hadoop job running...

In [None]:

data = ["hello world", "goodbye world"]

# map-side
mapped   = map(mapper, data))
combined = reduce(_combine, mapped, defaultdict(list))

# reduce-side
reduced  = map(reducer, combined.items())

# report
print([next(result) for result in reduced])