# Distributed processing

## Definitions

In a multiprocessing or parallel computing environment, there are multiple workers that handle different aspects of a shared task. The shared task may involve multiple steps, which may (or may not) be separable from one another. Depending on the nature of the task and the bottlenecks (slow steps) it contains, it can be useful to implement **task parallelism** and/or **data parallelism**. 

### Task parallelism

Think of coooking, especially in a restaurant kitchen. Multiple workers perform different parts of the workflow. These tasks are generally independent of one another, though some of them may need to be executed in sequence. The example below, running three different fits over the same data, is an example of task parallelism.

### Data parallelism

Think of multiple people eating a single dish. Each worker operates on a different part of large dataset. The results of each subset are then combined to produce a single result. "Split-apply-combine."

### Should you do it?

Note that parallel processing is not always worthwhile. Consider:

1. It is generally more difficult to write and debug parallel algorithms.
2. Parallel processing always introduces some amount of computational management overhead. In some cases, this overhead can be substantial.
3. The computational task itself may not be the slow step in a larger research process. Does it matter if your code takes a day to run? Maybe, but maybe not.

## One machine vs. many

Multiprocessing generally works on one machine at a time, i.e., the one on which the python interpreter is running. Cluster computing involves (again, generally) many different machines netwroked together. You can scale **a lot** further using a cluster, but this also introduces significant complexity and overhead. Spark is one approach to truly distributed cluster computing.

## Multiprocessing

In [2]:
import logging
import multiprocessing as mp
import nltk
import os
import sys
import time
import unicodedata

from functools import wraps
from nltk.corpus import wordnet as wn
from nltk.stem.wordnet import WordNetLemmatizer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.externals import joblib
from sklearn.pipeline import Pipeline
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neural_network import MLPClassifier

# Import our libraries
sys.path.append(os.path.join('..', 'libraries'))
from TMN import PickledCorpusReader

# Set path to pickled corpus
pickle_dir = os.path.join('..', 'data', 'pickled')

In [5]:
# Textbook code
class TextNormalizer(BaseEstimator, TransformerMixin):

    def __init__(self, language='english'):
        self.stopwords  = set(nltk.corpus.stopwords.words(language))
        self.lemmatizer = WordNetLemmatizer()

    def is_punct(self, token):
        return all(
            unicodedata.category(char).startswith('P') for char in token
        )

    def is_stopword(self, token):
        return token.lower() in self.stopwords

    def normalize(self, document):
        return [
            self.lemmatize(token, tag).lower()
            for paragraph in document
            for sentence in paragraph
            for (token, tag) in sentence
            if not self.is_punct(token) and not self.is_stopword(token)
        ]

    def lemmatize(self, token, pos_tag):
        tag = {
            'N': wn.NOUN,
            'V': wn.VERB,
            'R': wn.ADV,
            'J': wn.ADJ
        }.get(pos_tag[0], wn.NOUN)

        return self.lemmatizer.lemmatize(token, tag)

    def fit(self, X, y=None):
        return self

    def transform(self, documents):
        for document in documents:
            yield self.normalize(document[0])


def identity(doc):
    return doc

# Logging configuration
logging.basicConfig(
    level=logging.INFO,
    format="%(processName)-10s %(asctime)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def timeit(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        return result, time.time() - start
    return wrapper


def documents(corpus):
    return [
        list(corpus.docs(fileids=[fileid]))
        for fileid in corpus.fileids()
    ]


def labels(corpus):
    return [
        corpus.categories(fileids=[fileid])[0]
        for fileid in corpus.fileids()
    ]


@timeit
def train_model(path, model, saveto=None, cv=12):
    """
    Trains model from corpus at specified path; constructing cross-validation
    scores using the cv parameter, then fitting the model on the full data and
    writing it to disk at the saveto path if specified. Returns the scores.
    """
    # Load the corpus data and labels for classification
    corpus = PickledCorpusReader(path)
    X = documents(corpus)
    y = labels(corpus)

    # Compute cross validation scores
    scores = cross_val_score(model, X, y, cv=3)

    # Fit the model on entire data set
    model.fit(X, y)

    # Write to disk if specified
    #if saveto:
    #    joblib.dump(model, saveto)

    # Return scores as well as training time via decorator
    return scores


def fit_naive_bayes(path, saveto=None, cv=12):

    model = Pipeline([
        ('norm', TextNormalizer()),
        ('tfidf', TfidfVectorizer(tokenizer=identity, lowercase=False)),
        ('clf', MultinomialNB())
    ])

    if saveto is None:
        saveto = "naive_bayes_{}.pkl".format(time.time())

    scores, delta = train_model(path, model, saveto, cv)
    logger.info((
        "naive bayes training took {:0.2f} seconds "
        "with an average score of {:0.3f}"
    ).format(delta, scores.mean()))


def fit_logistic_regression(path, saveto=None, cv=12):
    model = Pipeline([
        ('norm', TextNormalizer()),
        ('tfidf', TfidfVectorizer(tokenizer=identity, lowercase=False)),
        ('clf', LogisticRegression(solver='lbfgs'))
    ])

    if saveto is None:
        saveto = "logistic_regression_{}.pkl".format(time.time())

    scores, delta = train_model(path, model, saveto, cv)
    logger.info((
        "logistic regression training took {:0.2f} seconds "
        "with an average score of {:0.3f}"
    ).format(delta, scores.mean()))


def fit_multilayer_perceptron(path, saveto=None, cv=12):
    model = Pipeline([
        ('norm', TextNormalizer()),
        ('tfidf', TfidfVectorizer(tokenizer=identity, lowercase=False)),
        ('clf', MLPClassifier(hidden_layer_sizes=(10,10), early_stopping=True))
    ])

    if saveto is None:
        saveto = "multilayer_perceptron_{}.pkl".format(time.time())

    scores, delta = train_model(path, model, saveto, cv)
    logger.info((
        "multilayer perceptron training took {:0.2f} seconds "
        "with an average score of {:0.3f}"
    ).format(delta, scores.mean()))


@timeit
def sequential(path):
    #Run each fit one after the other
    fit_naive_bayes(path)
    fit_logistic_regression(path)
    fit_multilayer_perceptron(path)


@timeit
def parallel(path):
    tasks = [
        fit_naive_bayes, fit_logistic_regression, fit_multilayer_perceptron,
    ]

    procs = []
    for task in tasks:
        proc = mp.Process(name=task.__name__, target=task, args=(path,))
        procs.append(proc)
        proc.start()

    for proc in procs:
        proc.join() # Tell multiproc that the task is done

In [6]:
logger.info("Begin parallel tasks")
_, delta = parallel(pickle_dir)
logger.info("Total parallel fit time: {:0.2f} seconds".format(delta))

MainProcess 2019-04-15 10:26:47 Begin parallel tasks
fit_naive_bayes 2019-04-15 10:29:17 naive bayes training took 149.91 seconds with an average score of 0.524
fit_logistic_regression 2019-04-15 10:29:17 logistic regression training took 150.03 seconds with an average score of 0.853
fit_multilayer_perceptron 2019-04-15 10:29:18 multilayer perceptron training took 151.47 seconds with an average score of 0.500
MainProcess 2019-04-15 10:29:18 Total parallel fit time: 151.55 seconds


In [7]:
print("Begin sequential tasks")
_, delta = sequential(pickle_dir)
print("Total sequential fit time: {:0.2f} seconds".format(delta))

Begin sequential tasks


MainProcess 2019-04-15 10:31:38 naive bayes training took 93.97 seconds with an average score of 0.524
MainProcess 2019-04-15 10:33:09 logistic regression training took 91.02 seconds with an average score of 0.853
MainProcess 2019-04-15 10:34:46 multilayer perceptron training took 97.07 seconds with an average score of 0.500


Total sequential fit time: 282.09 seconds
