# Distributed Linear Algebra on Apache Spark

## Part 1
### - Brief Apache Spark Intro

## Part 2
### - Linear regression problem formulation
### - Complexity of the closed form solution at scale
### - Big n Small d
### - Big n Big d
### - Iterative approach for Big n Big d
### - Parallel gradient descent

## Part 3 
### - PCA problem formulation
### - Big n Small d
### - Big n Big d

### Apache Spark is a fast and general engine for large-scale data processing
![Spark Libs](files/spark-libs.png)

### Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3.
![Spark Compatabilities](files/spark-cmp.png)

## Programming Model:

### Spark provides two main abstractions for parallel programming: 
### - resilient distributed datasets (RDD) 
### - parallel operations on these datasets 

In [43]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])

# Map a function on different nodes localy
squaredRDD = rdd.map(lambda i: i*i)

# Send data to one node for reduce operation
squaredRDD.reduce(lambda a,b: a + b)

285

## Linear regression problem formulation
###$n$ - training observation, $d$ - number of features

###$\mathbf{X} \in \mathbb{R}^{n\times d}$: dataset matrix 

###$\mathbf{y} \in \mathbb{R}^{n}$: labels

###$\mathbf{\hat{y}} \in \mathbb{R}^{n}$: predictions, where $\hat{y} = Xw$

###$\mathbf{w} \in \mathbb{R}^{d}$: regression parameters we want to learn

###$\mathbf{\min_w ||Xw - y||^{2}_{2}} $: objective function to learn the parameters

## Closed form solution 

###$\mathit{\frac{df}{dw}(w)} = \mathbf{wX^{T}X - X^{T}y} = 0$

###$\mathbf{w} = \mathbf{(X^{T}X)^{-1}X^{T}y}$

### CPU bottlenecks:
### Matrix multiplication $\mathbf{X^{T}X}$: $O(nd^2)$ operations

### Matrix inverse $^{-1}$: $O(d^{3})$ operations

### Space bottlenecks:
### $\mathbf{X}$: $O(nd)$ float64
### $\mathbf{X^{T}X}$ and inverse $^{-1}$: $O(d^{2})$ float64

## What if X is overdetermined? Big n Small d

### So that $O(nd^{2})$ to long to compute on single node
### $O(nd)$ to big to store on single node

### But $O(d^{3})$ and $O(d^{2})$ is OK

### Solution: find the way to distribute space and CPU utilization
### - Split observations (rows of the matrix $\mathbf{X}$) between many nodes
### - Use outer product instead of inner product for matrix multiplication
![Matrix multiplication using inner products](files/matmul.png)
Wikipedia: https://en.wikipedia.org/wiki/Matrix_multiplication

###$\begin{vmatrix} 1 & -3 & 2 \\ 4 & 5 & -2 \end{vmatrix} \begin{vmatrix} 1 & 3 \\ 2 & 4 \\ 5 & 6 \end{vmatrix}  = \begin{vmatrix} 1-6+10 & 3-12+12 \\ 4+10-10 & 12+20-12 \end{vmatrix} = \begin{vmatrix} 5 & 3 \\ 4 & 20 \end{vmatrix}$

### Sum of outer products
### $\begin{vmatrix} 1 & -3 & 2 \\ 4 & 5 & -2 \end{vmatrix} \begin{vmatrix} 1 & 3 \\ 2 & 4 \\ 5 & 6 \end{vmatrix} = \begin{vmatrix} 1 & 3 \\ 4 & 12 \end{vmatrix} + \begin{vmatrix} -6 & -12 \\ 10 & 20 \end{vmatrix} + \begin{vmatrix} 10 & 12 \\ -10 & -12 \end{vmatrix} = \begin{vmatrix} 5 & 3 \\ 4 & 20 \end{vmatrix}$

### Let's implement $\mathbf{(X^{T}X)^{-1}}$

In [413]:
import numpy

n = 1000
d = 3
x = numpy.random.random((n, d))
rdd = sc.parallelize(x) # Create RDD

# Compute outer product for each row
outerProductsRDD = rdd.map(lambda row: numpy.outer(row, row))

# Reduce, sum up and compute inverse
matrixSum = outerProductsRDD.reduce(lambda a,b: a + b)
numpy.linalg.inv(matrixSum)

array([[ 0.00825242, -0.00344546, -0.00370888],
       [-0.00344546,  0.00842575, -0.00362669],
       [-0.00370888, -0.00362669,  0.00854035]])

## Big n Big d

### $\mathbf{w} = \mathbf{(X^{T}X)^{-1}X^{T}y}$: now all operations and storage are bottlenecks

### Closed form solution can not be computed here

### But we can try to find approximate solution iteratively

![Gradient Descent](files/gd.png)
Wikipedia: https://en.wikipedia.org/wiki/Gradient_descent

### Gradient descent update rule
### $\mathbf{w}_{i+1} = \mathbf{w}_i + \alpha \mathit{\frac{df}{dw}(\mathbf{w}_i)}$
### $\mathbf{w}_{i+1} = \mathbf{w}_i + \alpha \sum_{j=1}^{n}{(\mathbf{w}_i^T \mathbf{x}^{(j)} - y^{(j)}) \mathbf{x}^{(j)}}$

### Note that: 
### - $(\mathbf{w}_i^T \mathbf{x}^{(j)} - y^{(j)}) \mathbf{x}^{(j)}$ is just $O(d)$ in CPU and Space complexity
### - Sum is commutative and associative operation

### Solution: compute gradient in parallel!

In [528]:
# Generate some data for our toy example
n = 1000
d = 1000
trueW = numpy.tile(0.5, d)
x = numpy.arange(0, 1, 1/float(n*d)).reshape(n,d)
f = lambda x: x.dot(trueW) + numpy.random.normal(size=(d))
rdd = sc.parallelize(x).map(lambda x: (x, f(x))) # Create RDD

# Gradient Descent
maxiter = 50       # Stop when reached
alpha = 1.0e-3     # Initital step 
w = numpy.zeros(d) # start with all weights equal zero

# Compute gradient element
def grad(w, data):
    x, y = data
    return (w.dot(x) - y) * x

for i in range(maxiter):
    # Reduce step size by number of iterations
    alph = alpha / (n * numpy.sqrt(i+1))
    
    # Compute gradient 
    gradient = (rdd.map(lambda row: grad(w, row))
                   .sum()) 
    
    # Update weights
    w -= alph * gradient
    print(w.mean())

0.166665904917
0.245233372993
0.294263227894
0.32855280764
0.354110499495
0.373963356251
0.389842263108
0.402823885251
0.413621045657
0.422726749084
0.430492696973
0.437180752912
0.442988664943
0.448067332219
0.452536552263
0.456491528197
0.460009065903
0.463151054441
0.465968861532
0.468505467739
0.470796628863
0.472872018713
0.474757757116
0.476475357058
0.478043810283
0.479479237352
0.480795686271
0.482005616391
0.483119576826
0.48414670271
0.48509597663
0.485974458083
0.486788394482
0.487543829373
0.488245394338
0.488898478007
0.489506754282
0.490074011514
0.490603870042
0.491098948711
0.491562508027
0.491996181532
0.492403120326
0.492784729335
0.493143145862
0.493480324053
0.493797323764
0.494095600867
0.494376609539
0.494641619162


### Wait, but $w$ is out of scope! It's $O(d)$ in network communication 

### Can we do more work localy?

In [527]:
fewIters = 5        # Do less iterations
miniBatchSize = 32  # Use 20 vectors on each iteration
alpha = 1.0e-4      # Initital step
w = numpy.zeros(d)  # start with all weights equal zero
frac = rdd.getNumPartitions() * miniBatchSize / float(n)

# Compute stochastic gradient using mini batches
for i in range(fewIters):
    # Reduce step size on each iteration
    alph = alpha / (n * numpy.sqrt(i+1))
    
    # Randomly sample rows from our dataset
    sampleRDD = rdd.sample(True, frac)
    
    # Calculate updates locally
    localUpdate = (sampleRDD.map(lambda row: grad(w, row))
                            .mapPartitions(lambda itr: reduce(numpy.add, itr))
                            .map(lambda gradient: alph * gradient)
                            .sum())
    
    # Average update
    meanUpdate = localUpdate / sampleRDD.getNumPartitions()
    w -= meanUpdate
    print(w.mean())

0.595293285308
0.519771003519
0.505719136095
0.502972349047
0.501243780913


## Principal Component Analysis (PCA)

![Principal Component Analysis](files/pca.png)
Wikipedia: https://en.wikipedia.org/wiki/Principal_component_analysis

### Solution of the PCA 
### $\mathbf{C} = \mathbf{U \Lambda U^T}$, where 
### $\mathbf{C} = \frac{1}{n} \mathbf{X^TX}$: $d \times d$ Covarience Matrix, features have zero mean
### $\mathbf{\Lambda}$:  $d \times d$ diagonal matrix with eigenvalues
### $\mathbf{U}$: $d \times d$ matrix with eigenvectors in columns
### - all eigenvectors point to the directon of max varience
### - eigenvalues equal variance in these directions

## Step by Step algorithm
### 1. Subtract mean
### 2. Compute Covarience Matrix
### 3. Eigendecomposition
### 4. Choose $k$ top components

## Big n Small d

### CPU bottlenecks:
### $\mathbf{X^{T}X}$: $O(nd^2)$ operations

### Eigendecomposition: $O(d^{3})$ operations

### Space bottlenecks:
### $\mathbf{X}$: $O(nd)$ float64
### $\mathbf{X^{T}X}$: $O(d^{2})$ float64

## Solution: distributed PCA

In [621]:
# Generate data
n = 1000
d = 3
x = numpy.random.normal(size=(n, d))
x[numpy.arange(1000),0] += numpy.arange(1000)
x[numpy.arange(1000),1] -= numpy.arange(1000)
rdd = sc.parallelize(x) # Create RDD

# 1. Subtract mean
m = rdd.mean()
centeredRDD = rdd.map(lambda row: row - m)

# 2. Compute Covariance Matrix
covMatrix = (centeredRDD.map(lambda row: numpy.outer(row, row))
                        .sum()) / n

# 3. Eigendecomposition
eigVal, eigVec = numpy.linalg.eigh(covMatrix)

# 4. Choose k components
idx = numpy.argsort(eigVal)
varienceExplained = eigVal / sum(eigVal)
topCmp = eigVec[:, idx[-1]]

print 'Percent of varience explained: \n {0}'.format(varienceExplained[::-1])
print '\nTop Component:\n {0}'.format(topCmp)

Percent of varience explained: 
 [  9.99988394e-01   6.20704328e-06   5.39913451e-06]

Top Component:
 [ -7.07059761e-01   7.07153796e-01  -5.84231638e-05]


## Reduced representation of original Matrix $\mathbf{X}$

In [623]:
reducedMatrix = rdd.map(lambda row: row.dot(topCmp))
reducedMatrix.take(10)

[0.13634950775205154,
 0.36994388929561672,
 -4.6625475716937155,
 -4.593954468693827,
 -6.4234341466658167,
 -7.3575389339955244,
 -7.9183379259488857,
 -8.8875331416642123,
 -9.5034889877149791,
 -13.552056294618763]