## The `multiprocessing` library
The multiprocessing module creates additional child processes with the same code as the parent process by either *forking* the parent on Unix systems (the OS snapshots the current running program into a new process). Each process runs its own Python interpreter and has its own GIL, each of which can utilize 100% of a CPU. Therefor if you have a quad-core processor and run four multiprocesses, it is possible to take advantage of 400% of your CPU.

### Architecture
<img src='figures/multiprocessing.png' width=500>

It consists of a parent (main) program and multiple child processes (usually one per core). The parent program schedules work (provides input) for the children and consumes results (gathers output). Data is passed to and from children and the parent using the `pickle` module. When the parent process terminates, the child processes generally also terminate, though they can become orphaned and continue running on their own.

*Forking* causes multiple child processes to be instantiated, whereas *joining* causes child processes to be ended, and control is passed back to the primary process.  

## Running Tasks In Parallel
Fit multiple models, cross validate them, and save them to disk. We will begin by writing three functions to generate a naive Bayes model, a logistic regression and a multilayer perceptron. Each function in turn creates three different models, defined by `Pipeline` that extract text from a corpus located at a specified path. Each task also determines a location to write the model to, and reports results using the logging module.

In [1]:
from transformers import TextNormalizer, identity

from sklearn.pipeline import Pipeline
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neural_network import MLPClassifier

In [4]:
def fit_naive_bayes(path, saveto=None, cv=12):
    model = Pipeline([
        ('norm', TextNormalizer()),
        ('tfidf', TfidfVectorizer(tokenizer=identity, lowercase=False)),
        ('clf', MultinomialNB())
    ])
    
    if saveto is None:
        saveto = 'naive_bayes_{}.pkl'.format(time.time())
        
    scores, delta = train_model(path, model, saveto, cv)
    logger.info((
        "naive bayes training took {:0.2f} seconds "
        "with an average score of {:0.3f}"
    ).format(delta, scores.mean()))
    
    
def fit_logistic_regression(path, saveto=None, cv=12):
    model = Pipeline([
        ('norm', TextNormalizer()),
        ('tfidf', TfidfVectorizer(tokenizer=identity, lowercase=False)),
        ('clf', LogisticRegression())
    ])

    if saveto is None:
        saveto = "logistic_regression_{}.pkl".format(time.time())
    
    scores, delta = train_model(path, model, saveto, cv)
    logger.info((
        "logistic regression training took {:0.2f} seconds "
        "with an average score of {:0.3f}"
    ).format(delta, scores.mean()))
    
def fit_multilayer_perceptron(path, saveto=None, cv=12):
    model = Pipeline([
        ('norm', TextNormalizer()),
        ('tfidf', TfidfVectorizer(tokenizer=identity, lowercase=False)),
        ('clf', MLPClassifier(hidden_layer_sizes=(10,10), early_stopping=True))
    ])
    if saveto is None:
        saveto = "multilayer_perceptron_{}.pkl".format(time.time())
    scores, delta = train_model(path, model, saveto, cv)
    logger.info((
        "multilayer perceptron training took {:0.2f} seconds "
        "with an average score of {:0.3f}"
    ).format(delta, scores.mean()))

In [5]:
from reader import PickledCorpusReader

from sklearn.externals import joblib
from sklearn.model_selection import cross_val_score

In [9]:
def documents(corpus):
    return [
        list(corpus.docs(fileids=fileid))
        for fileid in corpus.fileids()
    ]

def labels(corpus):
    return [
        corpus.categories(fileids=fileid)[0]
        for fileid in corpus.fileids()
    ]

In [7]:
import time
from functools import wraps

def timeit(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        return result, time.time() - start
    return wrapper

In [8]:
@timeit
def train_model(path, model, saveto=None, cv=12):
    # Load the corpus data and labels for classification
    corpus = PickledCorpusReader(path)
    X = documents(corpus)
    y = labels(corpus)
    
    # Compute cross validation scores
    scores = cross_val_score(model, X, y, cv=cv)
    
    # Fit the model on entire dataset
    model.fit(X, y)
    
    # Write to disk if specified
    if saveto:
        joblib.dump(model, saveto)

In [10]:
import logging

# logging configuration
logging.basicConfig(
    level=logging.INFO,
    format="%(processName)-10s % (asctime)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

In [11]:
import multiprocessing as mp

In [12]:
def run_parallel(path):
    tasks = [
        fit_naive_bayes, fit_logistic_regression, fit_multilayer_perceptron,
    ]
    logger.info('beginning parallel tasks')
    start = time.time()
    
    procs = []
    for task in tasks:
        proc= mp.Process(name=task.__name__, target=task, args=(path,))
        procs.append(proc)
        proc.start()
        
    for proc in procs:
        proc.join()
        
    delta = time.time() - start
    logger.info('total parallel fit time: {:0.2f} seconds'.format(delta))