# Large Scale Matrix Computations

In this notebookwe will walk through some of the more advanced things you can achieve with PyWren. Namely using S3 as a backing store we will implement a nearest neighbor classifier algorithm.

In [213]:
%pylab inline
import boto3
import cloudpickle
import itertools
import concurrent.futures as fs
import io
import numpy as np
import time
from importlib import reload
from sklearn import metrics
import pywren
import pywren.wrenconfig as wc
import itertools
from operator import itemgetter
import matrix

Populating the interactive namespace from numpy and matplotlib


`%matplotlib` prevents importing * from pylab and numpy
  "\n`%matplotlib` prevents importing * from pylab and numpy"


In [177]:
DEFAULT_BUCKET = wc.default()['s3']['bucket']

## 1. Matrix Multiplication

One nice thing about PyWren is it allows users to integrate existing python libraries easily.
For the following exercise, we are going to use some popular python libraries, e.g., NumPy, to work on some matrix multiplication problems.

In [5]:
import numpy as np

def my_function(b):
    x = np.random.normal(0, b, 1024)
    A = np.random.normal(0, b, (1024, 1024))
    return np.dot(A, x)

pwex = pywren.default_executor()
res = pwex.map(my_function, np.linspace(0.1, 10, 100))


## 2. Distributed Nearest Neighbor with Large Scale Matrix Multiplication 

A problem with the above method is that, we are limited to working with "small" matrices, that fit in the memory of a single lambda instance. With a little work we can write a "ShardedMatrix" wrapper that shards numpy matrices across S3 objects (the source code for this can be found in matrix.py) This allows us to use PyWren's map functionality to access different parts of the matrix. We can further use this functionality to compute a large scale matrix multiplication.


In the below example we will implement a distributed nearest neighbor implementation on top of PyWren and this ShardedMatrix abstraction. Note that nearest neighbor is often a hard to implement algorithm on BSP systems such as Apache Spark due to a high communication cost.

In [19]:
from sklearn.datasets import fetch_mldata
import matrix
reload(matrix)

<module 'matrix' from '/Users/vaishaal/research/risecamp/pywren/matrix.py'>

Lets download the mnist dataset. This cell should take about 3 minutes to complete.

In [20]:
X = fetch_mldata('MNIST original', data_home="/tmp/")['data'].astype('float32')
y = fetch_mldata('MNIST original', data_home="/tmp/")['target']
X_train = X[:60000, :]
y_train = y[:60000, np.newaxis]

X_test = X[60000:, :]
y_test = y[60000:, np.newaxis]

We will now "shard" the mnist matrix with a shard_size of 4000, what this means is that, we will convert the 60000 by 784 matrix into 30 separate 4000 x 784 numpy matrices that will be split across different S3 Keys. The first argument is the S3 folder where these submatrices can be found. This cell should take about 3 minutes to complete. 

In [21]:
%time X_train_sharded = matrix.ShardedMatrix("x_train", shape=X_train.shape, bucket=DEFAULT_BUCKET, shard_sizes=[4000,784])
#%time X_train_sharded.shard_matrix(X_train, n_jobs=16)

%time X_test_sharded = matrix.ShardedMatrix("x_test", shape=X_test.shape, bucket=DEFAULT_BUCKET, shard_sizes=[4000,784])
#%time X_test_sharded.shard_matrix(X_test, n_jobs=16)

CPU times: user 82.4 ms, sys: 6.61 ms, total: 89 ms
Wall time: 1.35 s
CPU times: user 82.7 ms, sys: 5.65 ms, total: 88.3 ms
Wall time: 1.3 s


Now that we have our sharded matrices we can compute a local nearest neighbor classifier and compare it with one we will compute with PyWren. If we do everything correctly the PyWren implementation should be identical to the local one, but with a better scaling with dataset size.

In [11]:
def compute_local_nearest_neighbor_labels(X_train, X_test, y_test, y_train):
    # compute a distance matrix
    train_norms = np.linalg.norm(X_train, axis=1)[:, np.newaxis] ** 2
    test_norms = np.linalg.norm(X_test, axis=1)[np.newaxis, :] ** 2
    return y_train[np.argmin(train_norms + -2*X_train.dot(X_test.T)+ test_norms, axis=0)]

In [191]:
%time y_test_pred = compute_local_nearest_neighbor_labels(X_train, X_test, y_test, y_train)
print("Accuracy is ", metrics.accuracy_score(y_test_pred, y_test))

CPU times: user 34.8 s, sys: 5.68 s, total: 40.5 s
Wall time: 38 s
Accuracy is  0.9691


#### Lets try using PyWren.
Our strategy will be to use PyWren to map over (training point, testing point) pairs (in a blockwise fashion), and generate the distance matrix in a sharded form. Then we can launch another PyWren job to extract the nearest neighbors. 

In [193]:
# create an "empty" ShardedMatrix on S3 (we will fill this Matrix in with pywren)
D_sharded = matrix.ShardedMatrix("D", shape=(X_train.shape[0], X_test.shape[0]), shard_sizes=[4000,4000], bucket=DEFAULT_BUCKET)

def compute_pywren_nearest_neighbor_distance_matrix(block_pair, X_train_sharded, X_test_sharded, D_sharded):
    block0,block1 = block_pair
    # compute a distance matrix block
    X_train_block = X_train_sharded.get_block(block0, 0)
    X_test_block = X_test_sharded.get_block(block1, 0)
    train_norms = np.linalg.norm(X_train_block, axis=1)[:, np.newaxis] ** 2
    test_norms = np.linalg.norm(X_test_block, axis=1)[np.newaxis, :] ** 2
    D_block = train_norms + -2*X_train_block.dot(X_test_block.T)+ test_norms
    D_sharded.put_block(block0, block1, D_block)
    return 0 

In [194]:
pywren_distance_function = lambda bidx: compute_pywren_nearest_neighbor_distance_matrix(bidx, X_train_sharded, X_test_sharded, D_sharded)

In [195]:
%time futures = pwex.map(pywren_distance_function, D_sharded.block_idxs)

CPU times: user 1.29 s, sys: 146 ms, total: 1.44 s
Wall time: 2.17 s


In [196]:
%time pywren.wait(futures)

CPU times: user 1.91 s, sys: 204 ms, total: 2.12 s
Wall time: 54.1 s


([<pywren.future.ResponseFuture at 0x118253160>,
  <pywren.future.ResponseFuture at 0x114b2b668>,
  <pywren.future.ResponseFuture at 0x118880b70>,
  <pywren.future.ResponseFuture at 0x1254cac50>,
  <pywren.future.ResponseFuture at 0x1187914a8>,
  <pywren.future.ResponseFuture at 0x114b2bc50>,
  <pywren.future.ResponseFuture at 0x118369a20>,
  <pywren.future.ResponseFuture at 0x125456128>,
  <pywren.future.ResponseFuture at 0x1254ca048>,
  <pywren.future.ResponseFuture at 0x1254ca208>,
  <pywren.future.ResponseFuture at 0x1188807b8>,
  <pywren.future.ResponseFuture at 0x1139d7470>,
  <pywren.future.ResponseFuture at 0x1188802e8>,
  <pywren.future.ResponseFuture at 0x114b2bd30>,
  <pywren.future.ResponseFuture at 0x118791cf8>,
  <pywren.future.ResponseFuture at 0x118696588>,
  <pywren.future.ResponseFuture at 0x118369390>,
  <pywren.future.ResponseFuture at 0x118253a90>,
  <pywren.future.ResponseFuture at 0x118541f60>,
  <pywren.future.ResponseFuture at 0x1188805f8>,
  <pywren.future.Res

In [197]:
def find_argmin(block_pair, D_sharded):
    D_block = D_sharded.get_block(*block_pair)
    offset = block_pair[0]*D_sharded.shard_sizes[0] 
    return (block_pair[1], offset + np.argmin(D_block, axis=0), np.min(D_block, axis=0))

In [198]:
%time futures = pwex.map(lambda x: find_argmin(x, D_sharded), sorted(D_sharded.block_idxs))

CPU times: user 1.22 s, sys: 146 ms, total: 1.37 s
Wall time: 2.05 s


In [199]:
%time pywren.wait(futures)

CPU times: user 1.37 s, sys: 146 ms, total: 1.51 s
Wall time: 19.4 s


([<pywren.future.ResponseFuture at 0x1182fd208>,
  <pywren.future.ResponseFuture at 0x1249d7358>,
  <pywren.future.ResponseFuture at 0x114a40e80>,
  <pywren.future.ResponseFuture at 0x1249cadd8>,
  <pywren.future.ResponseFuture at 0x1182fd668>,
  <pywren.future.ResponseFuture at 0x11859c908>,
  <pywren.future.ResponseFuture at 0x1182fd748>,
  <pywren.future.ResponseFuture at 0x1182fd7b8>,
  <pywren.future.ResponseFuture at 0x1249ca940>,
  <pywren.future.ResponseFuture at 0x11859c208>,
  <pywren.future.ResponseFuture at 0x1249ca1d0>,
  <pywren.future.ResponseFuture at 0x114b150f0>,
  <pywren.future.ResponseFuture at 0x1249eb128>,
  <pywren.future.ResponseFuture at 0x1249caba8>,
  <pywren.future.ResponseFuture at 0x1249ca978>,
  <pywren.future.ResponseFuture at 0x1249eb860>,
  <pywren.future.ResponseFuture at 0x1182fdcc0>,
  <pywren.future.ResponseFuture at 0x1256be198>,
  <pywren.future.ResponseFuture at 0x124792d30>,
  <pywren.future.ResponseFuture at 0x124792cc0>,
  <pywren.future.Res

In [200]:
results = [f.result() for f in futures]

In [201]:
def compute_results_from_pywren(results):
    mins = []
    for _, group in itertools.groupby(sorted(results, key=itemgetter(0)), key=itemgetter(0)):
        group = list(group)
        argmins = np.vstack([g[1] for g in group])
        argminmin = np.argmin(np.vstack([g[2] for g in group]), axis=0)
        mins.append(argmins[argminmin, np.arange(argmins.shape[1])])
    return np.hstack(mins)
y_test_pred_pywren = y_train[compute_results_from_pywren(results)]
print("Accuracy is ", metrics.accuracy_score(y_test, y_test_pred_pywren))



Accuracy is  0.9691


The advantage of this PyWren implementation is that we are executing in parallel over the entire matrix. So as we get more training and test points our implementation will scale gracefully along with it.

### Computing Training 1-NN

So one advantage of this PyWren implementation is that we can compute the training accuracy of our NN classifier.
Normally this is hard because the training distance matrix is 60000 x 60000. Which is around 10 GB and can be quite cumbersome to compute (and may not even fit in your local ram!).

(We know apriori for 1-NN it is always going to be 100% but we can verify that quickly with PyWren).

In [217]:
# create an "empty" ShardedMatrix on S3 (we will fill this Matrix in with pywren)
D_train_sharded = matrix.ShardedMatrix("D", shape=(X_train.shape[0], X_train.shape[0]), shard_sizes=[4000,4000], bucket=DEFAULT_BUCKET)

In [224]:
pywren_train_distance_function = lambda bidx: compute_pywren_nearest_neighbor_distance_matrix(bidx, X_train_sharded, X_train_sharded, D_train_sharded)
%time futures = pwex.map(pywren_train_distance_function, D_train_sharded.block_idxs)

CPU times: user 5.24 s, sys: 804 ms, total: 6.05 s
Wall time: 7.9 s


In [225]:
%time pywren.wait(futures)

CPU times: user 9.09 s, sys: 925 ms, total: 10 s
Wall time: 3min 11s


([<pywren.future.ResponseFuture at 0x111d7a080>,
  <pywren.future.ResponseFuture at 0x124b43550>,
  <pywren.future.ResponseFuture at 0x11f95ef28>,
  <pywren.future.ResponseFuture at 0x115581470>,
  <pywren.future.ResponseFuture at 0x1155d9780>,
  <pywren.future.ResponseFuture at 0x115578c50>,
  <pywren.future.ResponseFuture at 0x125508860>,
  <pywren.future.ResponseFuture at 0x12453e278>,
  <pywren.future.ResponseFuture at 0x11883f7b8>,
  <pywren.future.ResponseFuture at 0x11f89ae10>,
  <pywren.future.ResponseFuture at 0x111d73828>,
  <pywren.future.ResponseFuture at 0x11f7f6198>,
  <pywren.future.ResponseFuture at 0x11f87c390>,
  <pywren.future.ResponseFuture at 0x1147edc88>,
  <pywren.future.ResponseFuture at 0x11f958438>,
  <pywren.future.ResponseFuture at 0x11542fba8>,
  <pywren.future.ResponseFuture at 0x11f89d390>,
  <pywren.future.ResponseFuture at 0x11883f978>,
  <pywren.future.ResponseFuture at 0x1154fc4e0>,
  <pywren.future.ResponseFuture at 0x118873c88>,
  <pywren.future.Res

In [226]:
results = [f.result() for f in futures]

In [227]:
%time futures = pwex.map(lambda x: find_argmin(x, D_train_sharded), sorted(D_train_sharded.block_idxs))

CPU times: user 5.79 s, sys: 1.03 s, total: 6.83 s
Wall time: 4.49 s


In [228]:
%time pywren.wait(futures)

CPU times: user 4.76 s, sys: 626 ms, total: 5.38 s
Wall time: 25.6 s


([<pywren.future.ResponseFuture at 0x1200e32b0>,
  <pywren.future.ResponseFuture at 0x11ffc3550>,
  <pywren.future.ResponseFuture at 0x1200f73c8>,
  <pywren.future.ResponseFuture at 0x11f6825c0>,
  <pywren.future.ResponseFuture at 0x120198c18>,
  <pywren.future.ResponseFuture at 0x12475d940>,
  <pywren.future.ResponseFuture at 0x1175ef518>,
  <pywren.future.ResponseFuture at 0x12475df28>,
  <pywren.future.ResponseFuture at 0x11527c128>,
  <pywren.future.ResponseFuture at 0x117686da0>,
  <pywren.future.ResponseFuture at 0x117672a90>,
  <pywren.future.ResponseFuture at 0x117664668>,
  <pywren.future.ResponseFuture at 0x1254f16a0>,
  <pywren.future.ResponseFuture at 0x115321c18>,
  <pywren.future.ResponseFuture at 0x115458908>,
  <pywren.future.ResponseFuture at 0x118d646d8>,
  <pywren.future.ResponseFuture at 0x118d64e80>,
  <pywren.future.ResponseFuture at 0x115595cc0>,
  <pywren.future.ResponseFuture at 0x12494fdd8>,
  <pywren.future.ResponseFuture at 0x114a7a550>,
  <pywren.future.Res

In [229]:
results = [f.result() for f in futures]

In [232]:
print("Training Accuracy is ", metrics.accuracy_score(y_train, y_train[compute_results_from_pywren(results)]))

Training Accuracy is  1.0
