# Pescador demo

This notebook illustrates some of the basic functionality of [pescador](https://github.com/bmcfee/pescador): a package to facilitate iterative learning from data streams (implemented as python generators).

In [1]:
import pescador

import numpy as np
np.set_printoptions(precision=4)
import sklearn
import sklearn.datasets
import sklearn.linear_model
import sklearn.cross_validation
import sklearn.metrics

In [2]:
def data_generator(X, Y, m=20, scale = 1e-1):
    '''A gaussian noise generator for data
    
    Parameters
    ----------
    X : ndarray
        features, n_samples by dimensions
        
    Y : ndarray
        labels, n_samples
        
    m : int
        size of the minibatches to generate
        
    scale : float > 0
        scale of the noise to add
        
    Generates
    ---------
    batch
        An infinite stream of batch dictionaries
        batch = dict(X=X[i], Y=Y[i])
    '''
    
    X = np.atleast_2d(X)
    Y = np.atleast_1d(Y)

    n, d = X.shape
    
    while True:
        i = np.random.randint(0, n, size=m)
        
        noise = scale * np.random.randn(m, d)
        
        yield {'X': X[i] + noise, 'Y': Y[i]}

In [3]:
# Load up the iris dataset for the demo
data = sklearn.datasets.load_iris()
X, Y = data.data, data.target
classes = np.unique(Y)

In [4]:
# What does the data stream look like?

# First, we'll wrap the generator function in a Streamer object.
# This is necessary for a few reasons, notably so that we can re-instantiate
# the generator multiple times (eg once per epoch)

stream = pescador.Streamer(data_generator, X, Y)

# The buffer_batch() function takes a batch stream as input, and
# carves it into batches of up to buffer_size (3, in this case) samples
# the buffer size can be larger or smaller than the native size of the input batches
for q in pescador.buffer_batch(stream.generate(max_items=1), 3):
    print q

{'Y': array([1, 1, 2]), 'X': array([[ 5.3542,  2.1554,  4.1925,  1.2063],
       [ 6.948 ,  3.0258,  4.8126,  1.4288],
       [ 6.8807,  3.0223,  5.1551,  2.3169]])}
{'Y': array([0, 2, 1]), 'X': array([[ 4.6828,  3.1631,  1.5653,  0.06  ],
       [ 7.294 ,  3.0888,  6.5113,  2.17  ],
       [ 6.2679,  2.6062,  5.0636,  1.5904]])}
{'Y': array([0, 2, 0]), 'X': array([[ 4.7073,  3.5406,  1.052 ,  0.3011],
       [ 6.6355,  3.2431,  4.945 ,  1.9572],
       [ 5.126 ,  3.6004,  1.4447,  0.1314]])}
{'Y': array([1, 2, 0]), 'X': array([[ 6.8064,  3.0182,  4.4771,  1.273 ],
       [ 7.2639,  3.0231,  5.6306,  1.5871],
       [ 5.7686,  3.8879,  1.7202,  0.3382]])}
{'Y': array([0, 1, 1]), 'X': array([[ 5.2804,  3.5421,  1.3413,  0.5314],
       [ 5.9817,  2.7778,  4.5351,  1.7357],
       [ 5.9339,  2.7379,  4.0306,  1.0401]])}
{'Y': array([2, 1, 2]), 'X': array([[ 7.9579,  3.9336,  6.5118,  2.1807],
       [ 6.9516,  2.738 ,  4.8386,  1.4237],
       [ 6.8627,  3.2009,  4.9941,  2.3365]])}
{'Y'

# Benchmarking
We can benchmark our learner's efficiency by running a couple of experiments on the Iris dataset.

Our classifier will be L1-regularized logistic regression.

In [24]:
%%time
for train, test in sklearn.cross_validation.ShuffleSplit(len(X),
                                                         n_iter=2,
                                                         test_size=0.2):
    
    # Make an SGD learner, nothing fancy here
    classifier = sklearn.linear_model.SGDClassifier(verbose=0, 
                                                    loss='log',
                                                    penalty='l1', 
                                                    n_iter=1)
    
    # Make a streamable wrapper
    model = pescador.StreamLearner(classifier)
    
    # Again, build a streamer object
    stream = pescador.Streamer(data_generator, X[train], Y[train])
    
    # we'll buffer into batches of 16 samples each
    samples = pescador.buffer_batch(stream.generate(max_items=5e3),
                                   16)
    
    # And train the model on the stream.
    # iter_fit() works just like partial_fit(), except that the input is a generator.
    model.iter_fit(samples, classes=classes)
    
    # How's it do on the test set?
    print 'Test-set accuracy: %.3f' % sklearn.metrics.accuracy_score(Y[test], model.predict(X[test]))

Test-set accuracy: 0.900
Test-set accuracy: 0.900
CPU times: user 8.35 s, sys: 39.9 ms, total: 8.39 s
Wall time: 8.44 s


In [25]:
# How many steps did the model run?
model.estimator.t_

100001.0

# Parallelism

It's possible that the learner is more or less efficient than the data generator.  If the data generator has higher latency than the learner (SGDClassifier), then this will slow down the learning.

Pescador uses zeromq to parallelize data stream generation, effectively decoupling it from the learner.

In [26]:
%%time
for train, test in sklearn.cross_validation.ShuffleSplit(len(X), n_iter=2, test_size=0.2):
    
    # Make an SGD learner, nothing fancy here
    classifier = sklearn.linear_model.SGDClassifier(verbose=0, 
                                                    loss='log',
                                                    penalty='l1', 
                                                    n_iter=1)
    
    # Make a streamable wrapper
    model = pescador.StreamLearner(classifier)
    
    # First, turn the data_generator function into a Streamer object
    stream = pescador.Streamer(data_generator, X[train], Y[train])
    
    # Then, send this thread to a second process
    zmq_stream = pescador.zmq_stream(5156, stream, max_items=5e3)
    
    # Run the output through a second buffer for mini-batch training
    samples = pescador.buffer_batch(zmq_stream, 16)
    
    # And fit on the stream
    model.iter_fit(samples, classes=classes)
    
    # How's it do on the test set?
    print 'Test-set accuracy: %.3f' % sklearn.metrics.accuracy_score(Y[test], model.predict(X[test]))

Test-set accuracy: 1.000
Test-set accuracy: 1.000
CPU times: user 4.2 s, sys: 68.1 ms, total: 4.27 s
Wall time: 4.23 s


In [27]:
# How many steps did the model run?
model.estimator.t_

70081.0