Skip to content

Commit

Permalink
[SPARK-3382] Compare with the diff of solution vector
Browse files Browse the repository at this point in the history
  • Loading branch information
Lewuathe committed Jan 4, 2015
1 parent 4b125d2 commit e6c9cd2
Showing 1 changed file with 50 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.mllib.optimization
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks

import breeze.linalg.{DenseVector => BDV}
import breeze.linalg.{DenseVector => BDV, sum}

import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.Logging
Expand Down Expand Up @@ -85,7 +85,7 @@ class GradientDescent private[mllib] (private var gradient: Gradient, private va
* If the difference between last loss and last before loss is less than convergenceTol
* minibatch iteration will end at that point.
*/
def setconvergenceTol(tolerance: Double): this.type = {
def setConvergenceTol(tolerance: Double): this.type = {
this.convergenceTol = tolerance
this
}
Expand Down Expand Up @@ -158,8 +158,8 @@ object GradientDescent extends Logging {
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
* @param convergenceTol - Minibatch iteration will end before numIterations
* if the difference between last loss and last before loss
* is less than this value. Default value 0.001.
* if the difference between last solution vector and last before
* solution vector is less than this value. Default value 0.001.
* @return A tuple containing two elements. The first element is a column matrix containing
* weights for every feature, and the second element is an array containing the
* stochastic loss computed for every iteration.
Expand All @@ -182,6 +182,8 @@ object GradientDescent extends Logging {
}

val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
// Record history to calculate solution vector difference
val weightsHistory = new ArrayBuffer[Vector](numIterations)

val numExamples = data.count()

Expand All @@ -206,44 +208,44 @@ object GradientDescent extends Logging {
var regVal = updater.compute(
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2

val b = new Breaks
b.breakable {
for (i <- 1 to numIterations) {
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})

if (miniBatchSize > 0) {
/**
* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
regVal = update._2
if (stochasticLossHistory.length > 1) {
val lastLoss = stochasticLossHistory.last
val lastBeforeLoss = stochasticLossHistory(stochasticLossHistory.length - 2)
if (Math.abs(lastLoss - lastBeforeLoss) < convergenceTol) b.break
}
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
// Check parameter whether current iteration is tolerate or not
var tolerate = false
var i = 1
while (!tolerate && i <= numIterations) {
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})

if (miniBatchSize > 0) {
/**
* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
weightsHistory.append(weights)
regVal = update._2
if (weightsHistory.length > 1) {
if (solutionVecDiff(weightsHistory) < convergenceTol) tolerate = true
}
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
}
i += 1
}

logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
Expand All @@ -265,4 +267,12 @@ object GradientDescent extends Logging {
GradientDescent.runMiniBatchSGD(data, gradient, updater, stepSize, numIterations,
regParam, miniBatchFraction, initialWeights, 0.001)

// To compare with convergence tolerance
def solutionVecDiff(weightsHistory: ArrayBuffer[Vector]): Double = {
require(weightsHistory.length > 1)
val lastWeight = weightsHistory.last.toBreeze
val lastBeforeWeight = weightsHistory(weightsHistory.length - 2).toBreeze
sum((lastBeforeWeight - lastWeight) :* (lastBeforeWeight - lastWeight)) / weightsHistory.length
}

}

0 comments on commit e6c9cd2

Please sign in to comment.