
# Functional programming in Python with toolz and fn.py

*Juan Nunez-Iglesias*  
*Victorian Life Sciences Computation Initiative*  
*University of Melbourne*  

This talk is inspired by the work of Matt Rocklin


# Functional(-style) programming in Python with toolz and fn.py

*Juan Nunez-Iglesias*  
*Victorian Life Sciences Computation Initiative*  
*University of Melbourne*  

This talk is inspired by the work of Matt Rocklin

# What functional programming is to me

- y = f(x)

- ys = map(f, xs)

- z = summary(some_enormous_sequence)

# What functional programming is not to me (yet?)

- x += 1

![Monads forbidden](no-monad.png)

![The SciPy Ecosystem Core](scipy-ecosystem.png)

In [None]:
%matplotlib inline
import numpy as np
from matplotlib import pyplot as plt
np.set_printoptions(precision=3, suppress=True)

# batch programming with NumPy

Take the mean of log(r + 1) over all rows r in a dataset

In [None]:
import numpy as np

fin = 'data/expr.tsv'
counts = np.loadtxt(fin)
logcounts = np.log(counts + 1)
lcmean = np.mean(logcounts, axis=0)

print(lcmean)

# Simple streaming: grep

In [None]:
def simplegrep(pattern, filename):
    with open(filename) as fin:
        for linenum, line in enumerate(fin):
            if pattern in line:
                print(linenum, ':', line)

In [None]:
%ls -l data

In [None]:
%%timeit -n 1 -r 1
simplegrep('chrUn_DS484746v1', 'data/dm6.fa')

# A bit harder: multi-function streaming with `yield`

Revisiting log-mean

In [None]:
import numpy as np

def line2array(line):
    return np.array(line.rstrip().split(), dtype=float)

def readtsv(filename):
    with open(filename) as fin:
        for i, line in enumerate(fin):
            yield line2array(line)

def log(arrays_iter):
    for i, arr in enumerate(arrays_iter):
        yield np.log(arr + 1)

def mean(arrays_iter):
    for i, arr in enumerate(arrays_iter):
        if i == 0:
            mean = arr
        mean += (arr - mean) / (i + 1)
    return mean

print(mean(log(readtsv(fin))))

In [None]:
import numpy as np

def line2array(line):
    return np.array(line.rstrip().split(), dtype=float)

def readtsvv(filename):
    print('starting readtsv')
    with open(filename) as fin:
        for i, line in enumerate(fin):
            print('reading line {}'.format(i))
            yield line2array(line)
    print('finished readtsv')

def logv(arrays_iter):
    print('starting log')
    for i, arr in enumerate(arrays_iter):
        print('taking log of line {}'.format(i))
        yield np.log(arr + 1)
    print('finished log')

def meanv(arrays_iter):
    print('starting running mean')
    for i, arr in enumerate(arrays_iter):
        if i == 0:
            mean = arr
        mean += (arr - mean) / (i + 1)
        print('adding line {} to the running mean'.format(i))
    print('returning mean')
    return mean

In [None]:
print(meanv(logv(readtsvv(fin))))

# Toolz: making streaming beautiful

In [None]:
from toolz import pipe

pipe(fin, readtsv, log, mean)

In [None]:
from toolz.curried import map  # overriding built-in map

def log1(arr):
    return np.log(arr + 1)

pipe(fin, open,
     map(line2array),
     map(log1),
     mean)

# Fn.py: even more functional fun

(Though we'll just use nice lambdas here.)

In [None]:
from fn import _ as x

pipe(fin, open,
     map(line2array),
     map(x + 1), map(np.log),
     mean)

# Curry all the things!

In [None]:
from toolz import curry

array = curry(np.array)

pipe(fin, open,
     map(str.rstrip), map(str.split), map(array(dtype=float)),
     map(x + 1), map(np.log),
     mean)

In [None]:
from toolz import compose

line2array = compose(array(dtype=float), str.split, str.rstrip)

# Streaming PCA

In [None]:
from toolz import last
from toolz.curried import partition
from sklearn import decomposition

def streaming_pca(samples, n_components=2, batch_size=50):
    ipca = decomposition.IncrementalPCA(n_components=n_components,
                                        batch_size=batch_size)
    last(pipe(samples,                 # iterator of 1D arrays
              partition(batch_size),   # iterator of tuples of 1D arrays
              map(np.array),           # iterator of 2D arrays
              map(ipca.partial_fit)))  # partial_fit on each
    return ipca

In [None]:
pca = pipe('data/iris_data.txt', open,
           map(line2array),
           streaming_pca)

In [None]:
Xt = pipe('data/iris_data.txt', open,
          map(line2array),
          map(np.atleast_2d),
          map(pca.transform),
          list, np.squeeze)

In [None]:
from matplotlib import pyplot as plt

def plot_embedding(X, y):
    x_min, x_max = np.min(X, axis=0), np.max(X, axis=0)
    X = (X - x_min) / (x_max - x_min)

    plt.figure()
    ax = plt.subplot(111)
    for i in range(X.shape[0]):
        plt.scatter(X[i, 0], X[i, 1],
                    color=plt.cm.Set1(y[i] / 10.))
    plt.xticks([]), plt.yticks([])

In [None]:
y = np.loadtxt('data/iris_target.txt')
plot_embedding(Xt, y)

# k-mer counting

15-mer  
"to be or nob to”  
much less frequent than  
“to be or not to"

In [None]:
from toolz import concat, frequencies
from toolz.curried import sliding_window, filter

def is_sequence(line):
    return not line.startswith('>')


k = 7
counts = tz.pipe('data/sample.fasta', open,
                 filter(is_sequence),  # discard names
                 map(str.rstrip),
                 map(sliding_window(k)),  # apply sliding to each
                 concat,  # k-mers as char tuples
                 map(''.join),  # k-mers
                 frequencies)

counts = list(counts.values())

In [None]:
def integer_histogram(counts, *args, **kwargs):
    hist = np.bincount(counts)
    hist = hist / np.sum(hist)
    return plt.plot(1 + np.arange(len(hist)), hist, *args, **kwargs)

In [None]:
integer_histogram(counts, lw=2)
plt.xlim(-1, 250)

# Genome markov model

In [None]:
import itertools as it

LDICT = dict(zip('ACGTacgt', range(8)))
PDICT = {(a, b): (LDICT[a], LDICT[b])
         for a, b in it.product(LDICT, LDICT)}

def is_nucleotide(letter):
    return letter in LDICT  # ignore 'N'


@curry
def increment_model(model, index):
    model[index] += 1

In [None]:
def genome(filename):
    """Stream a genome, letter by letter, from a fasta file."""
    return pipe(filename, open,
                filter(is_sequence),    # drop header from each sequence
                concat,                 # concatenate chars from all lines
                filter(is_nucleotide))  # discard newlines and 'N'

In [None]:
def markov(seq):
    """Get a 1st-order Markov model from a sequence of nucleotides."""
    model = np.zeros((8, 8))
    last(pipe(seq, sliding_window(2),        # each successive tuple
              map(PDICT.__getitem__),        # location in matrix of tuple
              map(increment_model(model))))  # increment matrix
    # convert counts to transition probability matrix
    model /= np.sum(model, axis=1)[:, np.newaxis]
    return model

In [None]:
from toolz.curried import take

dm = 'data/dm6.fa'
import timeit

t0 = timeit.default_timer()
m = tz.pipe(dm, genome, take(int(1e6)), markov)
t1 = timeit.default_timer()

sec = t1 - t0
print("processed 1MB in %.1fmin" % (sec / 60))
print("throughput: %.2fMBps" % (1 / sec))

In [None]:
print('    ', '      '.join('ACGTacgt'), '\n')
print(m)

In [None]:
plt.imshow(m, cmap='inferno', interpolation='nearest');
plt.colorbar();
ax = plt.gca()
ax.set_xticklabels(' ACGTacgt');
ax.set_yticklabels(' ACGTacgt');

<center><img src='giovanni.jpg' width="400"></center>

(CC) Manu Cornet http://www.bonkersworld.net/all-engineers-are-the-same/