Skip to content

Commit

Permalink
[SPARK-11826][MLLIB] Refactor add() and subtract() methods
Browse files Browse the repository at this point in the history
srowen Could you please check this when you have time?

Author: Ehsan M.Kermani <ehsanmo1367@gmail.com>

Closes apache#9916 from ehsanmok/JIRA-11826.
  • Loading branch information
ehsanmok authored and roygao94 committed Mar 22, 2016
1 parent 9bb5f83 commit 58b9c8e
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg.distributed

import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{DenseMatrix => BDM}
import breeze.linalg.{DenseMatrix => BDM, Matrix => BM}

import org.apache.spark.{Logging, Partitioner, SparkException}
import org.apache.spark.annotation.Since
Expand Down Expand Up @@ -317,40 +317,72 @@ class BlockMatrix @Since("1.3.0") (
}

/**
* Adds two block matrices together. The matrices must have the same size and matching
* `rowsPerBlock` and `colsPerBlock` values. If one of the blocks that are being added are
* instances of [[SparseMatrix]], the resulting sub matrix will also be a [[SparseMatrix]], even
* if it is being added to a [[DenseMatrix]]. If two dense matrices are added, the output will
* also be a [[DenseMatrix]].
* For given matrices `this` and `other` of compatible dimensions and compatible block dimensions,
* it applies a binary function on their corresponding blocks.
*
* @param other The second BlockMatrix argument for the operator specified by `binMap`
* @param binMap A function taking two breeze matrices and returning a breeze matrix
* @return A [[BlockMatrix]] whose blocks are the results of a specified binary map on blocks
* of `this` and `other`.
* Note: `blockMap` ONLY works for `add` and `subtract` methods and it does not support
* operators such as (a, b) => -a + b
* TODO: Make the use of zero matrices more storage efficient.
*/
@Since("1.3.0")
def add(other: BlockMatrix): BlockMatrix = {
private[mllib] def blockMap(
other: BlockMatrix,
binMap: (BM[Double], BM[Double]) => BM[Double]): BlockMatrix = {
require(numRows() == other.numRows(), "Both matrices must have the same number of rows. " +
s"A.numRows: ${numRows()}, B.numRows: ${other.numRows()}")
require(numCols() == other.numCols(), "Both matrices must have the same number of columns. " +
s"A.numCols: ${numCols()}, B.numCols: ${other.numCols()}")
if (rowsPerBlock == other.rowsPerBlock && colsPerBlock == other.colsPerBlock) {
val addedBlocks = blocks.cogroup(other.blocks, createPartitioner())
val newBlocks = blocks.cogroup(other.blocks, createPartitioner())
.map { case ((blockRowIndex, blockColIndex), (a, b)) =>
if (a.size > 1 || b.size > 1) {
throw new SparkException("There are multiple MatrixBlocks with indices: " +
s"($blockRowIndex, $blockColIndex). Please remove them.")
}
if (a.isEmpty) {
new MatrixBlock((blockRowIndex, blockColIndex), b.head)
val zeroBlock = BM.zeros[Double](b.head.numRows, b.head.numCols)
val result = binMap(zeroBlock, b.head.toBreeze)
new MatrixBlock((blockRowIndex, blockColIndex), Matrices.fromBreeze(result))
} else if (b.isEmpty) {
new MatrixBlock((blockRowIndex, blockColIndex), a.head)
} else {
val result = a.head.toBreeze + b.head.toBreeze
val result = binMap(a.head.toBreeze, b.head.toBreeze)
new MatrixBlock((blockRowIndex, blockColIndex), Matrices.fromBreeze(result))
}
}
new BlockMatrix(addedBlocks, rowsPerBlock, colsPerBlock, numRows(), numCols())
new BlockMatrix(newBlocks, rowsPerBlock, colsPerBlock, numRows(), numCols())
} else {
throw new SparkException("Cannot add matrices with different block dimensions")
throw new SparkException("Cannot perform on matrices with different block dimensions")
}
}

/**
* Adds the given block matrix `other` to `this` block matrix: `this + other`.
* The matrices must have the same size and matching `rowsPerBlock` and `colsPerBlock`
* values. If one of the blocks that are being added are instances of [[SparseMatrix]],
* the resulting sub matrix will also be a [[SparseMatrix]], even if it is being added
* to a [[DenseMatrix]]. If two dense matrices are added, the output will also be a
* [[DenseMatrix]].
*/
@Since("1.3.0")
def add(other: BlockMatrix): BlockMatrix =
blockMap(other, (x: BM[Double], y: BM[Double]) => x + y)

/**
* Subtracts the given block matrix `other` from `this` block matrix: `this - other`.
* The matrices must have the same size and matching `rowsPerBlock` and `colsPerBlock`
* values. If one of the blocks that are being subtracted are instances of [[SparseMatrix]],
* the resulting sub matrix will also be a [[SparseMatrix]], even if it is being subtracted
* from a [[DenseMatrix]]. If two dense matrices are subtracted, the output will also be a
* [[DenseMatrix]].
*/
@Since("2.0.0")
def subtract(other: BlockMatrix): BlockMatrix =
blockMap(other, (x: BM[Double], y: BM[Double]) => x - y)

/** Block (i,j) --> Set of destination partitions */
private type BlockDestinations = Map[(Int, Int), Set[Int]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,49 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(sparseBM.add(sparseBM).toBreeze() === sparseBM.add(denseBM).toBreeze())
}

test("subtract") {
val blocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
((2, 0), new DenseMatrix(1, 2, Array(1.0, 0.0))), // Added block that doesn't exist in A
((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
val rdd = sc.parallelize(blocks, numPartitions)
val B = new BlockMatrix(rdd, rowPerPart, colPerPart)

val expected = BDM(
(0.0, 0.0, 0.0, 0.0),
(0.0, 0.0, 0.0, 0.0),
(0.0, 0.0, 0.0, 0.0),
(0.0, 0.0, 0.0, 0.0),
(-1.0, 0.0, 0.0, 0.0))

val AsubtractB = gridBasedMat.subtract(B)
assert(AsubtractB.numRows() === m)
assert(AsubtractB.numCols() === B.numCols())
assert(AsubtractB.toBreeze() === expected)

val C = new BlockMatrix(rdd, rowPerPart, colPerPart, m, n + 1) // columns don't match
intercept[IllegalArgumentException] {
gridBasedMat.subtract(C)
}
val largerBlocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(4, 4, new Array[Double](16))),
((1, 0), new DenseMatrix(1, 4, Array(1.0, 0.0, 1.0, 5.0))))
val C2 = new BlockMatrix(sc.parallelize(largerBlocks, numPartitions), 4, 4, m, n)
intercept[SparkException] { // partitioning doesn't match
gridBasedMat.subtract(C2)
}
// subtracting BlockMatrices composed of SparseMatrices
val sparseBlocks = for (i <- 0 until 4) yield ((i / 2, i % 2), SparseMatrix.speye(4))
val denseBlocks = for (i <- 0 until 4) yield ((i / 2, i % 2), DenseMatrix.eye(4))
val sparseBM = new BlockMatrix(sc.makeRDD(sparseBlocks, 4), 4, 4, 8, 8)
val denseBM = new BlockMatrix(sc.makeRDD(denseBlocks, 4), 4, 4, 8, 8)

assert(sparseBM.subtract(sparseBM).toBreeze() === sparseBM.subtract(denseBM).toBreeze())
}

test("multiply") {
// identity matrix
val blocks: Seq[((Int, Int), Matrix)] = Seq(
Expand Down

0 comments on commit 58b9c8e

Please sign in to comment.