In [18]:
import com.intel.analytics.bigdl.dataset.SparseTensorMiniBatch
import com.intel.analytics.bigdl.nn.{CrossEntropyCriterion, Module}
import com.intel.analytics.bigdl.numeric.NumericFloat
import com.intel.analytics.bigdl.models.widedeep_tutorial.SparseWideDeep
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.tensor.Tensor
import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter}
import com.intel.analytics.bigdl.dataset.{Sample, TensorSample}
import com.intel.analytics.bigdl.tensor.{Storage, Tensor}
import com.intel.analytics.bigdl.utils.{File, T}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import scopt.OptionParser
import java.nio.file.Paths

In [6]:
val conf = Engine.createSparkConf().setAppName("Wide and Deep Learning on Census").set("spark.rpc.message.maxSize", "1024").set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(conf)
Engine.init

In [7]:
val AGE = 0
val WORKCLASS = 1
val FNLWGT = 2
val EDUCATION = 3
val EDUCATION_NUM = 4
val MARITAL_STATUS = 5
val OCCUPATION = 6
val RELATIONSHIP = 7
val RACE = 8
val GENDER = 9
val CAPITAL_GAIN = 10
val CAPITAL_LOSS = 11
val HOURS_PER_WEEK = 12
val NATIVE_COUNTRY = 13
val LABEL = 14

val LABEL_COLUMN = "label"
val CSV_COLUMNS = Array("age", "workclass", "fnlwgt", "education", "education_num",
    "marital_status", "occupation", "relationship", "race", "gender",
    "capital_gain", "capital_loss", "hours_per_week", "native_country",
    "income_bracket")
val CATEGORICAL_COLUMNS = Array("workclass", "education", "marital_status", "occupation",
    "relationship", "race", "gender", "native_country")
val CONTINUOUS_COLUMNS = Array("age", "education_num", "capital_gain", "capital_loss",
    "hours_per_week")
val education_vocab = Array("Bachelors", "HS-grad", "11th", "Masters", "9th",
    "Some-college", "Assoc-acdm", "Assoc-voc", "7th-8th",
    "Doctorate", "Prof-school", "5th-6th", "10th", "1st-4th",
    "Preschool", "12th") // 16
val marital_status_vocab = Array("Married-civ-spouse", "Divorced", "Married-spouse-absent",
    "Never-married", "Separated", "Married-AF-spouse", "Widowed")
    val relationship_vocab = Array("Husband", "Not-in-family", "Wife", "Own-child", "Unmarried",
    "Other-relative")  // 6
val workclass_vocab = Array("Self-emp-not-inc", "Private", "State-gov", "Federal-gov",
    "Local-gov", "?", "Self-emp-inc", "Without-pay", "Never-worked") // 9
val gender_vocab = Array("Female", "Male")


In [8]:
def getAgeboundaries(age: String, start: Int = 0): Int = {
    if (age == "?") 0 + start
    else {
      val numage = age.toInt
      if (numage < 18) 0 else if (numage < 25) 1 else if (numage < 30) 2 else if (numage < 35) 3
      else if (numage < 40) 4 else if (numage < 45) 5 else if (numage < 50) 6
      else if (numage < 55) 7 else if (numage < 60) 8 else if (numage < 65) 9 else 10
    }
  }

In [9]:
def hashbucket(sth: String, bucketsize: Int = 1000, start: Int = 0): Int = {
    (sth.hashCode() % bucketsize + bucketsize) % bucketsize + start
  }

In [10]:
def categoricalFromVocabList(sth: String,
    vocab_list: Array[String], default: Int = 1, start: Int = 0): Int = {
    start + (if (vocab_list.contains(sth)) vocab_list.indexOf(sth) else default)
  }

In [14]:
/**
 * Load data of Census dataset.
 *
 * @param src RDD of file line
 * @param tag "Train" or "Test", represents train data or test data
 * @return
 */
def load(src: RDD[String], tag: String = "Train"): RDD[Sample[Float]] = {

    val iter = if (tag == "Train") src.filter(s => (s.length > 0)).map(_.trim().split(","))
    else src.filter(s => (!s.contains("|1x3 Cross validator") && s.length > 0))
      .map(_.trim().split(","))

    val storage = Storage[Float](11)
    val storageArray = storage.array()
    val results = iter.map(line => {
      val indices = new Array[Int](11)
      val lis = line.map(_.trim()).toSeq
      for (k <- 0 until 5) indices(k) = k
      indices(5) = hashbucket(lis(OCCUPATION), 1000, start = 0) + 5
      indices(6) = hashbucket(lis(NATIVE_COUNTRY), 1000, start = 0) + 1005
      indices(7) = 2005
      indices(8) = hashbucket(lis(EDUCATION) + lis(OCCUPATION), 1000, start = 0) + 2006 // 2006
      indices(9) = hashbucket(
        getAgeboundaries(lis(AGE)).toString + lis(EDUCATION) + lis(OCCUPATION), 1000) + 3006 // 2006
      indices(10) = hashbucket(lis(NATIVE_COUNTRY) + lis(OCCUPATION), 1000) + 4006 // 4006

      // 5006
      storageArray(0) = categoricalFromVocabList(lis(GENDER), gender_vocab, default = -1, start = 0)
      storageArray(1) = categoricalFromVocabList(
        lis(EDUCATION), education_vocab, default = -1, start = 0)
      storageArray(2) = categoricalFromVocabList(
        lis(MARITAL_STATUS), marital_status_vocab, default = -1, start = 0)
      storageArray(3) = categoricalFromVocabList(
        lis(RELATIONSHIP), relationship_vocab, default = -1, start = 0)
      storageArray(4) = categoricalFromVocabList(
        lis(WORKCLASS), workclass_vocab, default = -1, start = 0)

      storageArray(5) = 1
      storageArray(6) = 1
      storageArray(7) = getAgeboundaries(lis(AGE), 0)

      for (k <- 8 until 11) storageArray(k) = 1

      val sps = Tensor.sparse(Array(indices), storage, Array(5006), 11)
      val den = Tensor[Float](40).fill(0)
      den.setValue(
        categoricalFromVocabList(lis(WORKCLASS), workclass_vocab, start = 1), 1
      ) // 9
      den.setValue(
        categoricalFromVocabList(lis(EDUCATION), education_vocab, start = 10), 1
      ) // 16
      den.setValue(
        categoricalFromVocabList(lis(GENDER), gender_vocab, start = 26), 1
      ) // 2
      den.setValue(
        categoricalFromVocabList(lis(RELATIONSHIP), relationship_vocab,
          start = 28), 1
      ) // 6
      // total : 33
      den.setValue(34, hashbucket(lis(NATIVE_COUNTRY), 1000, 1).toFloat)
      den.setValue(35, hashbucket(lis(OCCUPATION), 1000, 1).toFloat)
      den.setValue(36, lis(AGE).toFloat)
      den.setValue(37, lis(EDUCATION_NUM).toFloat)
      den.setValue(38, lis(CAPITAL_GAIN).toFloat)
      den.setValue(39, lis(CAPITAL_LOSS).toFloat)
      den.setValue(40, lis(HOURS_PER_WEEK).toFloat)
      den.resize(1, 40)
      val train_label = if (lis(LABEL).contains(">50K")) Tensor[Float](T(2.0f))
                        else Tensor[Float](T(1.0f))
      train_label.resize(1, 1)

      TensorSample[Float](Array(sps, den), Array(train_label))
    })
    results
  }

In [46]:
val batchSize = 600
val trainData = "./census/train.data"
val testData = "./census/test.data"
val trainDataSet = load(sc.textFile(Paths.get(trainData).toString), "Train")
val validateSet = load(sc.textFile(Paths.get(testData).toString), "Test")

val model = SparseWideDeep[Float](modelType = "wide_n_deep", classNum = 2)
// println(model)

Name: Syntax Error.
Message: 
StackTrace: 

In [34]:
val optimMethod = new Adam[Float](
          learningRate = 0.001,
          learningRateDecay = 0.0005
        )

In [35]:
val optimizer = Optimizer(
        model = model,
        sampleRDD = trainDataSet,
        criterion = new CrossEntropyCriterion[Float](),
        batchSize = batchSize,
        miniBatch = new SparseTensorMiniBatch[Float](Array(
          Tensor.sparse(Array(5006), 1),
          Tensor(1, 40)),
          Array(Tensor(1, 1)))
      )

In [39]:
val trainedModel = optimizer.setOptimMethod(optimMethod).setValidation(Trigger.everyEpoch,
      validateSet, Array(new Top1Accuracy[Float],
        new Loss[Float](new CrossEntropyCriterion[Float]())),
      batchSize = batchSize,
      miniBatch = new SparseTensorMiniBatch[Float](Array(
        Tensor.sparse(Array(5006), 1),
        Tensor(1, 40)),
        Array(Tensor(1, 1)))).setEndWhen(Trigger.maxEpoch(20)).optimize()

can't find locality partition for partition 0 Partition locations are (ArrayBuffer(172.168.0.22)) Candidate partition locations are
(0,List()).


In [47]:
trainedModel.getParameters()._1

-1.2057106
-1.7911849
1.303036
-0.12848523
0.66809064
0.60296136
0.5408805
-1.1922419
-0.52261984
0.97402906
-1.0512428
-0.35746256
-0.28114575
-0.19829594
-0.5377282
0.22858697
4.353018
-0.6874456
0.9123058
-3.27013
1.8044297
-1.741428
-1.9338946
-0.5336254
0.14718446
1.6903374
-0.1187511
-1.2775198
2.0769305
-0.68084395
-0.68264616
-0.23208107
0.62686116
0.2869092
1.359752
-0.74884737
0.3248731
-0.16416597
2.2178771
1.0740796
0.38896376
-0.10943797
-1.4376335
-0.97883636
0.5645823
0.6205644
0.7705192
0.13986462
0.69354033
-1.3272737
-0.22880793
-0.16947827
-1.8390166
0.86883116
-0.2093836
-0.5267181
1.9036882
0.85405916
-0.30341175
-1.2195041
1.1937811
-0.9920934
-1.0179375
0.85659236
-1.6623333
0.6087423
2.2310846
-0.75958776
-0...

In [48]:
val result = trainedModel.evaluate(validateSet,
   Array(new Top1Accuracy[Float]), Some(100))

Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 1 in stage 4718.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4718.0 (TID 2368, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 1
	at com.intel.analytics.bigdl.dataset.MiniBatch$.resizeData(MiniBatch.scala:299)
	at com.intel.analytics.bigdl.dataset.MiniBatch$.resize(MiniBatch.scala:326)
	at com.intel.analytics.bigdl.dataset.ArrayTensorMiniBatch.set(MiniBatch.scala:203)
	at com.intel.analytics.bigdl.dataset.ArrayTensorMiniBatch.set(MiniBatch.scala:110)
	at com.intel.analytics.bigdl.dataset.SampleToMiniBatch$$anon$2.next(Transformer.scala:344)
	at com.intel.analytics.bigdl.dataset.SampleToMiniBatch$$anon$2.next(Transformer.scala:322)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$cla