* [Distributed Matrix Representation](#DistributedMatrixRepresentation)
	* [Collection of Matrix Blocks (and keys)](#CollectionofMatrixBlocks)
	* [Partitioning](#Partitioning)
	* [Common Matrix Block Representations](#CommonMatrixBlockRepresentations)
* [Buffer Pool Overview](#BufferPoolOverview)
* [Spark-specific Optimizations](#SparkspecificOptimizations)
	* [Partitioning-preserving ops](#PartitioningPreservingOps)
	* [Partitioning-exploiting ops](#PartitioningExploitingOps)
* [Spark-specific rewrites](#SparkspecificRewrites)


# Distributed Matrix Representation <a id="DistributedMatrixRepresentation"></a>

## Collection of “Matrix Blocks” (and keys)  <a id="CollectionofMatrixBlocks"></a>
- “tiles”, a.k.a. “chunks”
- Bag semantics (duplicates, unordered)
- Logical (Fixed-Size) Blocking 
  - Pros: join processing / independence
  - Cons: (sparsity skew)
- E.g., SystemML on Spark:`JavaPairRDD<MatrixIndexes,MatrixBlock>`
- Blocks encoded independently (dense/sparse)

## Partitioning  <a id="Partitioning"></a>
- Logical Partitioning (e.g., row-/column-wise)
- Physical Partitioning (e.g., Hash / Grid)

![Partitioning](images/MBPartitioning.png)


## Common Matrix Block Representations <a id="CommonMatrixBlockRepresentations"></a>
- Dense (row-major arrays)
- MCSR (modified CSR)
- CSR (compressed sparse rows), CSC
- COO (Coordinate matrix)

![Common Matrix Block Representation](images/MBRepresentation.png)

In [1]:
from systemml import MLContext, dml
ml = MLContext(sc)

print "SystemML Version:", ml.version()
print "SystemML Built-Time:", ml.buildTime()

SystemML Version: 1.0.0-SNAPSHOT
SystemML Built-Time: 2017-08-14 01:28:05 UTC


The below example demonstrates logical blocking of 3400 X 2700 matrix with block size of 1000.

In [None]:
def printMatrixBlockInfo(X):
    """
    Simple utility to print the information about the matrix blocks contained in  X.
    
    Parameters
    ----------
    X: Matrix
    """
    mbs = X._java_matrix.toBinaryBlocks().values().collect()
    for i in range(mbs.size()):
        mb = mbs.get(i)
        sparse = str(mb.isInSparseFormat())
        nnz = str(mb.getNonZeros())
        size = str(mb.getNumRows()) + " X " +  str(mb.getNumColumns())
        print("sparse? = " + sparse + ", nonzeros = " + nnz + ", size: " + size)

prog = """
X = rand(rows=3400, cols=2700, sparsity=0.4)     # generate a random matrix with sparsity 0.4
"""
X = ml.execute(dml(prog).output("X")).get("X")
printMatrixBlockInfo(X)

# Buffer Pool Overview <a id="BufferPoolOverview"></a>

- Motivation
  - Exchange of intermediates between local and remote operations (HDFS, RDDs, GPU device memory)
  - Eviction of in-memory objects (integrated with garbage collector)
- Primitives: acquireRead, acquireModify, release, exportData, getRdd, getBroadcast
- Spark Specifics
  - Lineage tracking
  - RDDs/broadcasts
  - Guarded RDD collect/parallelize
  - Partitioned Broadcast variables
  
![Buffer Pool](images/BufferPool.png)

# Spark-specific Optimizations <a id="SparkspecificOptimizations"></a>

## Partitioning-preserving ops  <a id="PartitioningPreservingOps"></a>
- Op is partitioning-preserving if key not changed (guaranteed)
-  Implicit: Use restrictive APIs (mapValues() vs mapToPair())
-  Explicit: Partition computation w/ declaration of partitioning-preserving (memory-efficiency via “lazy iterators”)

## Partitioning-exploiting ops <a id="PartitioningExploitingOps"></a>
-  Implicit: Operations based on join, cogroup, etc
-  Explicit: Custom physical operators on original keys (e.g., zipmm)

Example: **Partitioning-Exploiting ZIPMM**

![Zipmm](images/Zipmm.png)

## Spark-specific rewrites <a id="SparkspecificRewrites"></a>

![Spark specific optimization](images/Spark-specific-optimization.png)

