Skip to content

Commit

Permalink
Merge pull request #169 from johnynek/feature/diagonal-matrix-product
Browse files Browse the repository at this point in the history
Feature/diagonal matrix product
  • Loading branch information
azymnis committed Aug 31, 2012
2 parents 0d23cd7 + 671fd54 commit fa98fb4
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 13 deletions.
24 changes: 20 additions & 4 deletions src/main/scala/com/twitter/scalding/mathematics/Matrix.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ import scala.annotation.tailrec
* each row/col/value type is generic, with the constraint that ValT is a Ring[T]
* In practice, RowT and ColT are going to be Strings, Integers or Longs in the usual case.
*
* WARNING:
* It is NOT OKAY to use the same instance of Matrix/Row/Col with DIFFERENT Monoids/Rings/Fields.
* If you want to change, midstream, the Monoid on your ValT, you have to construct a new Matrix.
* This is due to caching of internal computation graphs.
*
* RowVector - handles matrices of row dimension one. It is the result of some of the matrix methods and has methods
* that return ColVector and diagonal matrix
*
Expand Down Expand Up @@ -414,16 +419,18 @@ class Matrix[RowT, ColT, ValT]
new Matrix[ColT,RowT,ValT](colSym, rowSym, valSym, inPipe, sizeHint.transpose)
}

// This method will only work if the row type and column type are the same
// the type constraint below means there is evidence that RowT and ColT are
// the same type
def diagonal(implicit ev : =:=[RowT,ColT]) : DiagonalMatrix[RowT,ValT] = {
// This should only be called by def diagonal, which verifies that RowT == ColT
protected lazy val mainDiagonal : DiagonalMatrix[RowT,ValT] = {
val diagPipe = pipe.filter(rowSym, colSym) { input : (RowT, RowT) =>
(input._1 == input._2)
}
.project(rowSym, valSym)
new DiagonalMatrix[RowT,ValT](rowSym, valSym, diagPipe, sizeHint)
}
// This method will only work if the row type and column type are the same
// the type constraint below means there is evidence that RowT and ColT are
// the same type
def diagonal(implicit ev : =:=[RowT,ColT]) = mainDiagonal

/*
* This just removes zeros after the join inside a zip
Expand Down Expand Up @@ -565,6 +572,9 @@ class DiagonalMatrix[IdxT,ValT](val idxSym : Symbol,
val valSym : Symbol, inPipe : Pipe, val sizeHint : SizeHint)
extends WrappedPipe {

def *[That,Res](that : That)(implicit prod : MatrixProduct[DiagonalMatrix[IdxT,ValT],That,Res]) : Res
= { prod(this, that) }

def pipe = inPipe
def fields = (idxSym, valSym)
def trace(implicit mon : Monoid[ValT]) : Scalar[ValT] = {
Expand All @@ -575,6 +585,12 @@ class DiagonalMatrix[IdxT,ValT](val idxSym : Symbol,
}
new Scalar[ValT](valSym, scalarPipe)
}
def toCol : ColVector[IdxT,ValT] = {
new ColVector[IdxT,ValT](idxSym, valSym, inPipe, sizeHint.setRows(1L))
}
def toRow : RowVector[IdxT,ValT] = {
new RowVector[IdxT,ValT](idxSym, valSym, inPipe, sizeHint.setCols(1L))
}
// Inverse of this matrix *IGNORING ZEROS*
def inverse(implicit field : Field[ValT]) : DiagonalMatrix[IdxT, ValT] = {
val diagPipe = inPipe.flatMap(valSym -> valSym) { element : ValT =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,21 @@ trait MatrixProduct[Left,Right,Result] extends java.io.Serializable {
* This object holds the implicits to handle matrix products between various types
*/
object MatrixProduct extends java.io.Serializable {
var MAX_TINY_JOIN = 100000L // Bigger than this, and we use joinWithSmaller
// These are VARS, so you can set them before you start:
var maxTinyJoin = 100000L // Bigger than this, and we use joinWithSmaller
var maxReducers = 200

def getJoiner(leftSize : SizeHint, rightSize : SizeHint) : MatrixJoiner = {
if (SizeHintOrdering.lteq(leftSize, rightSize)) {
// If leftsize is definite:
leftSize.total.map { t => if (t < MAX_TINY_JOIN) TinyToAny else SmallToBig }
leftSize.total.map { t => if (t < maxTinyJoin) TinyToAny else SmallToBig }
// Else just assume the right is smaller, but both are unknown:
.getOrElse(BigToSmall)
}
else {
// left > right
rightSize.total.map { rs =>
if (rs < MAX_TINY_JOIN) AnyToTiny else BigToSmall
if (rs < maxTinyJoin) AnyToTiny else BigToSmall
}.getOrElse(BigToSmall)
}
}
Expand Down Expand Up @@ -184,6 +187,12 @@ object MatrixProduct extends java.io.Serializable {
right.pipe
)
val newHint = left.sizeHint * right.sizeHint
// Hint of groupBy reducer size
val grpReds = newHint.total.map { tot =>
// + 1L is to make sure there is at least once reducer
(tot / MatrixProduct.maxTinyJoin + 1L).toInt min MatrixProduct.maxReducers
}.getOrElse(-1) //-1 means use the default number

val productPipe = Matrix.filterOutZeros(left.valSym, ring) {
getJoiner(left.sizeHint, right.sizeHint)
// TODO: we should use the size hints to set the number of reducers:
Expand All @@ -195,6 +204,10 @@ object MatrixProduct extends java.io.Serializable {
.groupBy(left.rowSym.append(getField(newRightFields, 1))) {
// We should use the size hints to set the number of reducers here
_.reduce(left.valSym) { (x: Tuple1[ValT], y: Tuple1[ValT]) => Tuple1(ring.plus(x._1, y._1)) }
// There is a low chance that many (row,col) keys are co-located, and the keyspace
// is likely huge, just push to reducers
.forceToReducers
.reducers(grpReds)
}
}
// Keep the names from the left:
Expand All @@ -203,21 +216,79 @@ object MatrixProduct extends java.io.Serializable {
}
}

// TODO: optimize this. We don't need to do the groupBy if the matrix is already diagonal
implicit def diagMatrixProduct[RowT,ColT,ValT](implicit ring : Ring[ValT]) :
MatrixProduct[DiagonalMatrix[RowT,ValT],Matrix[RowT,ColT,ValT],Matrix[RowT,ColT,ValT]] =
new MatrixProduct[DiagonalMatrix[RowT,ValT],Matrix[RowT,ColT,ValT],Matrix[RowT,ColT,ValT]] {
def apply(left : DiagonalMatrix[RowT,ValT], right : Matrix[RowT,ColT,ValT]) = {
Matrix.diagonalToMatrix(left) * right
val (newRightFields, newRightPipe) = ensureUniqueFields(
(left.idxSym, left.valSym),
(right.rowSym, right.colSym, right.valSym),
right.pipe
)
val newHint = left.sizeHint * right.sizeHint
val productPipe = Matrix.filterOutZeros(right.valSym, ring) {
getJoiner(left.sizeHint, right.sizeHint)
// TODO: we should use the size hints to set the number of reducers:
.apply(left.pipe, (left.idxSym -> getField(newRightFields, 0)), newRightPipe)
// Do the product:
.map((left.valSym.append(getField(newRightFields, 2))) -> getField(newRightFields,2)) { pair : (ValT,ValT) =>
ring.times(pair._1, pair._2)
}
// Keep the names from the right:
.project(newRightFields)
.rename(newRightFields -> (right.rowSym, right.colSym, right.valSym))
}
new Matrix[RowT,ColT,ValT](right.rowSym, right.colSym, right.valSym, productPipe, newHint)
}
}

// TODO: optimize this. We don't need to do the groupBy if the matrix is already diagonal
implicit def matrixDiagProduct[RowT,ColT,ValT](implicit ring : Ring[ValT]) :
MatrixProduct[Matrix[RowT,ColT,ValT],DiagonalMatrix[ColT,ValT],Matrix[RowT,ColT,ValT]] =
new MatrixProduct[Matrix[RowT,ColT,ValT],DiagonalMatrix[ColT,ValT],Matrix[RowT,ColT,ValT]] {
def apply(left : Matrix[RowT,ColT,ValT], right : DiagonalMatrix[ColT,ValT]) = {
left * Matrix.diagonalToMatrix(right)
// (A * B) = (B^T * A^T)^T
// note diagonal^T = diagonal
(right * (left.transpose)).transpose
}
}

implicit def diagDiagProduct[IdxT,ValT](implicit ring : Ring[ValT]) :
MatrixProduct[DiagonalMatrix[IdxT,ValT],DiagonalMatrix[IdxT,ValT],DiagonalMatrix[IdxT,ValT]] =
new MatrixProduct[DiagonalMatrix[IdxT,ValT],DiagonalMatrix[IdxT,ValT],DiagonalMatrix[IdxT,ValT]] {
def apply(left : DiagonalMatrix[IdxT,ValT], right : DiagonalMatrix[IdxT,ValT]) = {
val (newRightFields, newRightPipe) = ensureUniqueFields(
(left.idxSym, left.valSym),
(right.idxSym, right.valSym),
right.pipe
)
val newHint = left.sizeHint * right.sizeHint
val productPipe = Matrix.filterOutZeros(left.valSym, ring) {
getJoiner(left.sizeHint, right.sizeHint)
// TODO: we should use the size hints to set the number of reducers:
.apply(left.pipe, (left.idxSym -> getField(newRightFields, 0)), newRightPipe)
// Do the product:
.map((left.valSym.append(getField(newRightFields, 1))) -> left.valSym) { pair : (ValT,ValT) =>
ring.times(pair._1, pair._2)
}
}
// Keep the names from the left:
.project(left.idxSym, left.valSym)
new DiagonalMatrix[IdxT,ValT](left.idxSym, left.valSym, productPipe, newHint)
}
}

implicit def diagColProduct[IdxT,ValT](implicit ring : Ring[ValT]) :
MatrixProduct[DiagonalMatrix[IdxT,ValT],ColVector[IdxT,ValT],ColVector[IdxT,ValT]] =
new MatrixProduct[DiagonalMatrix[IdxT,ValT],ColVector[IdxT,ValT],ColVector[IdxT,ValT]] {
def apply(left : DiagonalMatrix[IdxT,ValT], right : ColVector[IdxT,ValT]) = {
(left * (right.diag)).toCol
}
}
implicit def rowDiagProduct[IdxT,ValT](implicit ring : Ring[ValT]) :
MatrixProduct[RowVector[IdxT,ValT],DiagonalMatrix[IdxT,ValT],RowVector[IdxT,ValT]] =
new MatrixProduct[RowVector[IdxT,ValT],DiagonalMatrix[IdxT,ValT],RowVector[IdxT,ValT]] {
def apply(left : RowVector[IdxT,ValT], right : DiagonalMatrix[IdxT,ValT]) = {
((left.diag) * right).toRow
}
}
}
57 changes: 55 additions & 2 deletions src/test/scala/com/twitter/scalding/mathematics/MatrixTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,17 @@ class ScalarOps(args: Args) extends Job(args) {
(mat1 / mat1.trace).pipe.write(Tsv("divtrace"))
}

class DiagonalOps(args : Args) extends Job(args) {
import Matrix._
val mat = Tsv("mat1",('x1,'y1,'v1))
.read
.toMatrix[Int,Int,Double]('x1,'y1,'v1)
(mat * mat.diagonal).write(Tsv("mat-diag"))
(mat.diagonal * mat).write(Tsv("diag-mat"))
(mat.diagonal * mat.diagonal).write(Tsv("diag-diag"))
(mat.diagonal * mat.getCol(1)).write(Tsv("diag-col"))
(mat.getRow(1) * mat.diagonal).write(Tsv("row-diag"))
}

class MatrixTest extends Specification {
noDetailedDiffs() // For scala 2.9
Expand All @@ -130,6 +141,9 @@ class MatrixTest extends Specification {
def toSparseMat[Row,Col,V](iter : Iterable[(Row,Col,V)]) : Map[(Row,Col),V] = {
iter.map { it => ((it._1, it._2),it._3) }.toMap
}
def oneDtoSparseMat[Idx,V](iter : Iterable[(Idx,V)]) : Map[(Idx,Idx),V] = {
iter.map { it => ((it._1, it._1), it._2) }.toMap
}

"A MatrixProd job" should {
TUtil.printStack {
Expand Down Expand Up @@ -252,9 +266,9 @@ class MatrixTest extends Specification {
TUtil.printStack {
JobTest("com.twitter.scalding.mathematics.VctDiv")
.source(Tsv("mat1",('x1,'y1,'v1)), List((1,1,1.0),(2,2,3.0),(1,2,4.0)))
.sink[(Int,Int,Double)](Tsv("vctDiv")) { ob =>
.sink[(Int,Double)](Tsv("vctDiv")) { ob =>
"correctly compute vector element-wise division" in {
val pMap = toSparseMat(ob)
val pMap = oneDtoSparseMat(ob)
pMap must be_==( Map((2,2)->1.3333333333333333) )
}
}
Expand Down Expand Up @@ -300,4 +314,43 @@ class MatrixTest extends Specification {
.finish
}
}
"A Matrix Diagonal job" should {
TUtil.printStack {
JobTest(new DiagonalOps(_))
/* [[1.0 4.0]
* [0.0 3.0]]
*/
.source(Tsv("mat1",('x1,'y1,'v1)), List((1,1,1.0),(2,2,3.0),(1,2,4.0)))
.sink[(Int,Int,Double)](Tsv("diag-mat")) { ob =>
"correctly compute diag * matrix" in {
val pMap = toSparseMat(ob)
pMap must be_==( Map((1,1)->1.0, (1,2)->4.0, (2,2)->9.0) )
}
}
.sink[(Int,Double)](Tsv("diag-diag")) { ob =>
"correctly compute diag * diag" in {
val pMap = oneDtoSparseMat(ob)
pMap must be_==( Map((1,1)->1.0, (2,2)->9.0) )
}
}
.sink[(Int,Int,Double)](Tsv("mat-diag")) { ob =>
"correctly compute matrix * diag" in {
val pMap = toSparseMat(ob)
pMap must be_==( Map((1,1)->1.0, (1,2)->12.0, (2,2)->9.0) )
}
}
.sink[(Int,Double)](Tsv("diag-col")) { ob =>
"correctly compute diag * col" in {
ob.toMap must be_==( Map(1->1.0))
}
}
.sink[(Int,Double)](Tsv("row-diag")) { ob =>
"correctly compute row * diag" in {
ob.toMap must be_==( Map(1->1.0, 2 -> 12.0))
}
}
.run
.finish
}
}
}

0 comments on commit fa98fb4

Please sign in to comment.