In [248]:
"""
This function implements OverSketch from 
https://people.eecs.berkeley.edu/~vipul_gupta/oversketch.pdf
It calculates A*B approximately using sketching on AWS Lambda.
Takes arguments: 
BigMatrix 'A' 
BigMatrix 'B' 
sketch dimension 'd', 
straggler threshold 'thres', default 0.95 

BigMatrix objects A and B should satisfy:
-A.shard_sizes[0] = B.shard_sizes[1] = b, where b is the block-size
-b divides d
-Columns and rows of A and B, respectively, are unsharded

Returns: A numpywren BigMatrix that contains the sketched product of A and B in S3
"""

import numpy as np
import pywren
import time
import numpywren
from numpywren import matrix, matrix_utils
from numpywren import binops
from numpywren.matrix_init import shard_matrix

In [320]:
m = 2000
n = 10000
l = m
b = 1000
d = int(4*b)
thres = 0.95

In [321]:
A_loc = np.asarray(range(m*n))
A_loc = A_loc.reshape(m,n)

In [322]:
A = matrix.BigMatrix("oversketch_A_{0}_{1}_{2}".format(m, n, b), shape=(m, n), shard_sizes=(b, n), write_header=True)
shard_matrix(A, A_loc)

<numpywren.matrix.BigMatrix at 0x7f51916582b0>

In [323]:
A.block_idxs_not_exist

[]

In [324]:
B = A.T

In [325]:
m = A.shape[0]
n = A.shape[1]
l = B.shape[1]
b = A.shard_sizes[0]

In [326]:
assert (d % b == 0)
assert (b == B.shard_sizes[1])

In [327]:
N = int(d/b)

In [328]:
sketch_A = matrix.BigMatrix("sketch_A_{0}_{1}".format(m, d), shape=(m, d), shard_sizes=(b, b))
sketch_BT = matrix.BigMatrix("sketch_B_{0}_{1}".format(l, d), shape=(l, d), shard_sizes=(b, b))

In [329]:
hashes = np.random.randint(0, b, size=(N, n))
flips = np.random.choice([-1,1], size=(N, n))

In [330]:
def OverSketchMatrix(id, X, hashes, flips, b, sketch):
    """
    Calculates OverSketch AS for a row-block of a fat matrix A with block-size b
    """
    x = id[0]
    y = id[1]
    A = X.get_block(x,0)
    m,n = A.shape
    hash_local = hashes[y,:]
    flip_local = flips[y,:]
    sketch_block = np.zeros((m, b))
    for i in range(n):
        sketch_block[:, hash_local[i]] += flip_local[i]*A[:,i]
    sketch.put_block(sketch_block/np.sqrt(N), x, y)
    return 0

In [331]:
pwex = pywren.lambda_executor()
t1 = time.time()
futuresA = pwex.map(lambda x: OverSketchMatrix(x, A, hashes, flips, b, sketch_A), sketch_A.block_idxs)
futuresB = pwex.map(lambda x: OverSketchMatrix(x, B.T, hashes, flips, b, sketch_BT), sketch_BT.block_idxs)

In [332]:
fs_donesA = pywren.wait(futuresA, 2)[0]
fs_donesB = pywren.wait(futuresB, 2)[0]
while len(fs_donesA)<thres*len(futuresA) and len(fs_donesB)<thres*len(futuresB):
    fs_donesA = pywren.wait(futuresA, 2)[0]
    fs_donesB = pywren.wait(futuresB, 2)[0]
    # print (len(fs_donesA), len(fs_donesB))
print("Sketching time", time.time() - t1)

Sketching time 7.129902362823486


In [333]:
## Computation phase
def blockMatMul(A, B, tensorAB, id):
    """
    Multiplies A and B.T in a blocked fashion
    """
    i = id[0]
    j = id[1]
    k = id[2]
    print(i,j,k)
    X = A.get_block(i,k)
    Y = B.get_block(j,k)
    tensorAB[k].put_block(X.dot(Y.T), i, j)
    return 0

In [334]:
tensorAB = []
for x in range(N):
    tensorAB.append(matrix.BigMatrix("AxB_outer_{0}_{1}_{2}".format(m, l, x), shape=(m, l), shard_sizes=(b, b)))

In [335]:
computeArr = [(i,j,k) for (i,k) in sketch_A.block_idxs for j in sketch_BT._block_idxs(0)]

In [336]:
t1 = time.time()
futures = pwex.map(lambda x: blockMatMul(sketch_A, sketch_BT, tensorAB, x), computeArr)
fs_dones = pywren.wait(futures, 2)[0]
while len(fs_dones)<thres*len(futures):
    fs_dones = pywren.wait(futures, 2)[0]
print("Computation time", time.time() - t1)

Computation time 46.33478307723999


In [337]:
blockMatMul(sketch_A, sketch_BT, tensorAB, (0,0,0))

0 0 0


0

In [338]:
AB = matrix.BigMatrix("AxB_{0}_{1}".format(m, l), shape=(m, l), shard_sizes=(b, b))

In [339]:
reduceArr = [(i,j) for i in sketch_A._block_idxs(0) for j in sketch_BT._block_idxs(0)]

In [340]:
## Reduction phase
def blockMatMulReduction(tensorAB, AB, id):
    """
    Reduces the output from computation phase to get A*B
    """
    i = id[0]
    j = id[1]
    X = None
    for k in range(N):
        if X is None:
            try:
                X = tensorAB[k].get_block(i,j)
            except Exception as e:
                print(e)
                pass
        else:
            try:
                X = X + tensorAB[k].get_block(i,j)
            except Exception as e:
                print(e)
                pass
    AB.put_block(X, i, j)  
    return 0

In [341]:
t1 = time.time()
futures_red = pwex.map(lambda x: blockMatMulReduction(tensorAB, AB, x), reduceArr)
fs_dones = pywren.wait(futures_red)[0]
print("Reduction time", time.time() - t1)

Reduction time 6.067621469497681


In [342]:
c = AB.numpy()

In [343]:
aaT = A_loc.dot(A_loc.T)

In [344]:
np.linalg.norm(aaT-c, ord='fro')/np.linalg.norm(aaT,ord='fro')

0.5774055612404048

In [345]:
sketch_A.block_idxs_not_exist

[]

In [346]:
e = sketch_A.numpy()

In [347]:
e.shape

(2000, 4000)

In [348]:
np.linalg.norm(aaT-e.dot(e.T), ord='fro')/np.linalg.norm(aaT,ord='fro')

0.002796590690059316

In [349]:
ee = sketch_BT.numpy()

In [350]:
np.linalg.norm(aaT-e.dot(ee.T), ord='fro')/np.linalg.norm(aaT,ord='fro')

0.002796590690059316

In [351]:
def count_sketch(A, d):
    """
    Given a np matrix A of dimension nxm, and dimension d, 
    performs the count sketch algorithm on A and 
    returns S*A, a matrix of dimension dxm
    """
    n, m = np.shape(A)
    sketched_mtx = np.zeros((d, m))
    hashes = np.random.randint(0, d, size=(1, n))
    flips = np.random.choice([-1,1], size=(1, n))
    
    for i in range(n):
        sketched_mtx[hashes[0, i], :] += flips[0, i]*A[i, :]
    return sketched_mtx

In [352]:
SAT = count_sketch(A_loc.T, d)

In [353]:
np.linalg.norm(aaT-SAT.T.dot(SAT))/(np.linalg.norm(aaT))

0.005207767905716421

In [319]:
!jupyter nbconvert --to script OverSketch.ipynb

[NbConvertApp] Converting notebook OverSketch.ipynb to script
[NbConvertApp] Writing 5693 bytes to OverSketch.py
