Skip to content

Commit

Permalink
added comments and fixed style as per rb
Browse files Browse the repository at this point in the history
  • Loading branch information
leahmcguire committed Mar 5, 2015
1 parent b61b5e2 commit 7622b0c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ import org.json4s.jackson.JsonMethods._
import org.json4s.{DefaultFormats, JValue}

import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.mllib.classification.NaiveBayesModels.NaiveBayesModels
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.classification.NaiveBayesModels.NaiveBayesModels
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}


/**
*
* Model types supported in Naive Bayes:
* multinomial and Bernoulli currently supported
*/
object NaiveBayesModels extends Enumeration {
type NaiveBayesModels = Value
Expand All @@ -45,6 +46,8 @@ object NaiveBayesModels extends Enumeration {
}
}



/**
* Model for Naive Bayes Classifiers.
*
Expand All @@ -55,7 +58,6 @@ object NaiveBayesModels extends Enumeration {
* @param modelType The type of NB model to fit from the enumeration NaiveBayesModels, can be
* Multinomial or Bernoulli
*/

class NaiveBayesModel private[mllib] (
val labels: Array[Double],
val pi: Array[Double],
Expand All @@ -68,11 +70,14 @@ class NaiveBayesModel private[mllib] (
private val brzPi = new BDV[Double](pi)
private val brzTheta = new BDM(theta(0).length, theta.length, theta.flatten).t

private val brzNegTheta: Option[BDM[Double]] = modelType match {
case NaiveBayesModels.Multinomial => None
//Bernoulli scoring requires log(condprob) if 1 log(1-condprob) if 0
//precomputing log(1.0 - exp(theta)) and its sum for linear algebra application
//of this condition in predict function
private val (brzNegTheta, brzNegThetaSum) = modelType match {
case NaiveBayesModels.Multinomial => (None, None)
case NaiveBayesModels.Bernoulli =>
val negTheta = brzLog((brzExp(brzTheta.copy) :*= (-1.0)) :+= 1.0) // log(1.0 - exp(x))
Option(negTheta)
(Option(negTheta), Option(brzSum(brzNegTheta, Axis._1)))
}

override def predict(testData: RDD[Vector]): RDD[Double] = {
Expand All @@ -89,8 +94,7 @@ class NaiveBayesModel private[mllib] (
labels (brzArgmax (brzPi + brzTheta * testData.toBreeze) )
case NaiveBayesModels.Bernoulli =>
labels (brzArgmax (brzPi +
(brzTheta - brzNegTheta.get) * testData.toBreeze +
brzSum(brzNegTheta.get, Axis._1)))
(brzTheta - brzNegTheta.get) * testData.toBreeze + brzNegThetaSum.get))
}
}

Expand All @@ -114,10 +118,11 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
def thisClassName = "org.apache.spark.mllib.classification.NaiveBayesModel"

/** Model data for model import/export */
case class Data(labels: Array[Double],
pi: Array[Double],
theta: Array[Array[Double]],
modelType: String)
case class Data(
labels: Array[Double],
pi: Array[Double],
theta: Array[Array[Double]],
modelType: String)

def save(sc: SparkContext, path: String, data: Data): Unit = {
val sqlContext = new SQLContext(sc)
Expand Down Expand Up @@ -192,7 +197,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). The input feature values must be nonnegative.
*/
class NaiveBayes private (private var lambda: Double,
var modelType: NaiveBayesModels) extends Serializable with Logging {
private var modelType: NaiveBayesModels) extends Serializable with Logging {

def this(lambda: Double) = this(lambda, NaiveBayesModels.Multinomial)

Expand Down Expand Up @@ -284,7 +289,7 @@ object NaiveBayes {
/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
* This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* This is the default Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
* document classification.
*
Expand All @@ -300,7 +305,7 @@ object NaiveBayes {
/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
* This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* This is the default Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
* document classification.
*
Expand All @@ -316,11 +321,13 @@ object NaiveBayes {
/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
* This is by default the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle
* all kinds of discrete data. For example, by converting documents into TF-IDF vectors,
* it can be used for document classification. By making every vector a 0-1 vector and
* setting the model type to NaiveBayesModels.Bernoulli, it fits and predicts as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
* The model type can be set to either Multinomial NB ([[http://tinyurl.com/lsdw6p]])
* or Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). The Multinomial NB can handle
* discrete count data and can be called by setting the model type to "Multinomial".
* For example, it can be used with word counts or TF_IDF vectors of documents.
* The Bernoulli model fits presence or absence (0-1) counts. By making every vector a
* 0-1 vector and setting the model type to "Bernoulli", the fits and predicts as
* Bernoulli NB.
*
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package org.apache.spark.mllib.classification

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum, Axis}
import breeze.stats.distributions.Multinomial
import org.apache.spark.mllib.classification.NaiveBayesModels.NaiveBayesModels

import scala.util.Random

import org.scalatest.FunSuite

import org.apache.spark.SparkException
import org.apache.spark.mllib.classification.NaiveBayesModels.NaiveBayesModels
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
Expand All @@ -49,7 +49,7 @@ object NaiveBayesSuite {
theta: Array[Array[Double]], // CXD
nPoints: Int,
seed: Int,
dataModel: NaiveBayesModels = NaiveBayesModels.Multinomial,
dataModel: NaiveBayesModels= NaiveBayesModels.Multinomial,
sample: Int = 10): Seq[LabeledPoint] = {
val D = theta(0).length
val rnd = new Random(seed)
Expand Down Expand Up @@ -92,7 +92,10 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {
assert(numOfPredictions < input.length / 5)
}

def validateModelFit(piData: Array[Double], thetaData: Array[Array[Double]], model: NaiveBayesModel) = {
def validateModelFit(
piData: Array[Double],
thetaData: Array[Array[Double]],
model: NaiveBayesModel) = {
def closeFit(d1: Double, d2: Double, precision: Double): Boolean = {
(d1 - d2).abs <= precision
}
Expand All @@ -117,14 +120,20 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {
Array(0.10, 0.10, 0.70, 0.10) // label 2
).map(_.map(math.log))

val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42, NaiveBayesModels.Multinomial)
val testData = NaiveBayesSuite.generateNaiveBayesInput(
pi, theta, nPoints, 42, NaiveBayesModels.Multinomial)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()

val model = NaiveBayes.train(testRDD, 1.0, "Multinomial")
validateModelFit(pi, theta, model)

val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17, NaiveBayesModels.Multinomial)
val validationData = NaiveBayesSuite.generateNaiveBayesInput(
pi,
theta,
nPoints,
17,
NaiveBayesModels.Multinomial)
val validationRDD = sc.parallelize(validationData, 2)

// Test prediction on RDD.
Expand All @@ -144,14 +153,24 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {
Array(0.02, 0.02, 0.60, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.30) // label 2
).map(_.map(math.log))

val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 45, NaiveBayesModels.Bernoulli)
val testData = NaiveBayesSuite.generateNaiveBayesInput(
pi,
theta,
nPoints,
45,
NaiveBayesModels.Bernoulli)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()

val model = NaiveBayes.train(testRDD, 1.0, "Bernoulli")
validateModelFit(pi, theta, model)

val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 20, NaiveBayesModels.Bernoulli)
val validationData = NaiveBayesSuite.generateNaiveBayesInput(
pi,
theta,
nPoints,
20,
NaiveBayesModels.Bernoulli)
val validationRDD = sc.parallelize(validationData, 2)

// Test prediction on RDD.
Expand Down Expand Up @@ -218,8 +237,8 @@ class NaiveBayesClusterSuite extends FunSuite with LocalClusterSparkContext {
LabeledPoint(random.nextInt(2), Vectors.dense(Array.fill(n)(random.nextDouble())))
}
}
// If we serialize data directly in the task closure, the size of the serialized task would be
// greater than 1MB and hence Spark would throw an error.
// If we serialize data directly in the task closure, the size of the serialized task
// would be greater than 1MB and hence Spark would throw an error.
val model = NaiveBayes.train(examples)
val predictions = model.predict(examples.map(_.features))
}
Expand Down

0 comments on commit 7622b0c

Please sign in to comment.