# Wide and Deep Learning on BigDL

Wide and Deep Learning Model, proposed by Google in 2016, is a DNN-Linear mixed model. Wide and deep learning has been used for Google App Store for their app recommendation.

In this tutorial, we'll introduce how to use the new SparseTensor API to train a wide linear model and a deep neural network, which is called wide and deep model, on BigDL. Wide and deep model combines the strength of memorization and generalization. It's useful for generic large-scale regression and classification problems with sparse input features(e.g., categorical features with a large number of possible feature values).

If you are interested in learning more about how Wide and Deep Learning model works, you can check out:
[Google Research Paper](https://arxiv.org/abs/1606.07792) or [Tensorflow tutorials](https://www.tensorflow.org/tutorials/wide_and_deep).

You can refer to Tensorflow tutorial to get the detailed description of wide and deep model.

On BigDL, there are some points different from Tensorflow:

1. Our SparseTensor data structure is different from Tensorflow.
2. We use only one Optimizer Adam to optimize both wide part and deep part of the model.
3. Our model building is different from Tensorflow.

And that's it! Let's go through a simple example.

The example is to train a wide and deep model to predict the probability that the individual has an annual income of over 50,000 dollars using the [Census Income Dataset](https://archive.ics.uci.edu/ml/datasets/Census+Income).

### Setup

To try the code for this tutorial:

1. [Install BigDL](https://bigdl-project.github.io/master/#ScalaUserGuide/install-build-src/) if you haven't already.
2. Download [the tutorial code](Code link).
3. Execute the tutorial code with the following command to train a wide and deep model.

```
BigDL_HOME=...
BigDL_VERSION=0.3.0-SNAPSHOT
PYTHON_API_PATH=${BigDL_HOME}/dist/lib/bigdl-${BigDL_VERSION}-python-api.zip
BigDL_JAR_PATH=${BigDL_HOME}/dist/lib/bigdl-${BigDL_VERSION}-jar-with-dependencies.jar
PYTHONPATH=${PYTHON_API_PATH}:$PYTHONPATH

MASTER=local[24]

spark-submit \
    --master ${MASTER} \
    --driver-cores 24 \
    --driver-memory 80g \
    --executor-cores 24  \
    --executor-memory 180g \
    --total-executor-cores 24 \
    --conf spark.rpc.message.maxSize=1024 \
    --properties-file ${BigDL_HOME}/dist/conf/spark-bigdl.conf \
    --jars ${BigDL_JAR_PATH} \
    --conf spark.driver.extraClassPath=${BigDL_JAR_PATH} \
    --conf spark.executor.extraClassPath=${BigDL_JAR_PATH} \
    --class com.intel.analytics.bigdl.models.widedeep_tutorial.Train \
    ${BigDL_JAR_PATH} \
    -f ${BigDL_HOME}/census \
    -b 1200 \
    -e 100 \
    -r 0.001
```

`census` directory stores your train data and test data.

```
./census$ tree .
.
├── train.data
├── test.data
```

* ```--f``` option can be used to set data folder, which contains train and test data.

* ```--b``` option can be used to set batch size, the default value is 128.

* ```--e``` option can be used to control how to end the training process.

* ```--r``` option can be used to control the initial learning rate for training.


To verify the accuracy, search "accuracy" from log:

```
INFO  DistriOptimizer$:247 - [Epoch 1 0/32561][Iteration 1][Wall Clock 0.0s] Train 1280 in xx seconds. Throughput is xx records/second.

INFO  DistriOptimizer$:629 - Top1Accuracy is Accuracy(correct: 13843, count: 16281, accuracy: 0.8502548983477674)
```

Read on to find out how this code builds its wide and deep model.

### Import the necessary classes and objects

Firstly we import the necessary classes and objects.

In [2]:
import com.intel.analytics.bigdl.dataset.SparseTensorMiniBatch
import com.intel.analytics.bigdl.nn.{ClassNLLCriterion, Module}
import com.intel.analytics.bigdl.numeric.NumericFloat
import com.intel.analytics.bigdl.models.widedeep_tutorial.SparseWideDeep
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.tensor.Tensor
import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter}
import com.intel.analytics.bigdl.dataset.{Sample, TensorSample}
import com.intel.analytics.bigdl.tensor.{Storage, Tensor}
import com.intel.analytics.bigdl.utils.{File, T}
import com.intel.analytics.bigdl.visualization.{TrainSummary, ValidationSummary}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import scopt.OptionParser
import java.nio.file.Paths

### New SparkContext to run the training process

Secondly, we new a SparkContext for the training process.

In [3]:
val conf = Engine.createSparkConf().setAppName("Wide and Deep Learning on Census").set("spark.rpc.message.maxSize", "1024").set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(conf)
Engine.init

### Define Base Feature Columns and Columns' Vocabulary List

In this part, we define the base categorical and continuous feature columns that we'll use. These base columns will be the building blocks used by both the wide part and the deep part of the model.

In [4]:
val AGE = 0
val WORKCLASS = 1
val FNLWGT = 2
val EDUCATION = 3
val EDUCATION_NUM = 4
val MARITAL_STATUS = 5
val OCCUPATION = 6
val RELATIONSHIP = 7
val RACE = 8
val GENDER = 9
val CAPITAL_GAIN = 10
val CAPITAL_LOSS = 11
val HOURS_PER_WEEK = 12
val NATIVE_COUNTRY = 13
val LABEL = 14

val LABEL_COLUMN = "label"
val CSV_COLUMNS = Array("age", "workclass", "fnlwgt", "education", "education_num",
    "marital_status", "occupation", "relationship", "race", "gender",
    "capital_gain", "capital_loss", "hours_per_week", "native_country",
    "income_bracket")
val CATEGORICAL_COLUMNS = Array("workclass", "education", "marital_status", "occupation",
    "relationship", "race", "gender", "native_country")
val CONTINUOUS_COLUMNS = Array("age", "education_num", "capital_gain", "capital_loss",
    "hours_per_week")
val education_vocab = Array("Bachelors", "HS-grad", "11th", "Masters", "9th",
    "Some-college", "Assoc-acdm", "Assoc-voc", "7th-8th",
    "Doctorate", "Prof-school", "5th-6th", "10th", "1st-4th",
    "Preschool", "12th") // 16
val marital_status_vocab = Array("Married-civ-spouse", "Divorced", "Married-spouse-absent",
    "Never-married", "Separated", "Married-AF-spouse", "Widowed")
val relationship_vocab = Array("Husband", "Not-in-family", "Wife", "Own-child", "Unmarried",
    "Other-relative")  // 6
val workclass_vocab = Array("Self-emp-not-inc", "Private", "State-gov", "Federal-gov",
    "Local-gov", "?", "Self-emp-inc", "Without-pay", "Never-worked") // 9
val gender_vocab = Array("Female", "Male")


### Define Methods For Feature Columns

Now we define some methods for feature columns.
`getAgeboundaries(age, start)` accepts age of one person, and returns the age buckets he/she falls in. Age boundaries is [18, 25, 30, 35, 40, 45, 50, 55, 60, 65].
`hashbucket(sth, bucketsize, start)` accepts a string, gets its hash code, and then puts it into a bucket of a lots of buckets, whose number is controlled by bucketsize.
`categoricalFromVocabList(sth, vocab_list, default, start)` accepts a string and a vocabulary list of feature column, returns the category of the feature value with respect to the feature column of the example.

In [5]:
def getAgeboundaries(age: String, start: Int = 0): Int = {
    if (age == "?") 0 + start
    else {
      val numage = age.toInt
      if (numage < 18) 0 else if (numage < 25) 1 else if (numage < 30) 2 else if (numage < 35) 3
      else if (numage < 40) 4 else if (numage < 45) 5 else if (numage < 50) 6
      else if (numage < 55) 7 else if (numage < 60) 8 else if (numage < 65) 9 else 10
    }
  }

def hashbucket(sth: String, bucketsize: Int = 1000, start: Int = 0): Int = {
    (sth.hashCode() % bucketsize + bucketsize) % bucketsize + start
  }

def categoricalFromVocabList(sth: String,
    vocab_list: Array[String], default: Int = 1, start: Int = 0): Int = {
    start + (if (vocab_list.contains(sth)) vocab_list.indexOf(sth) else default)
  }

### Define Preprocessing Procedure

Next we'll define our preprocessing procedure, which is included by a function `load()`.
`load(src, tag)` reads the train data or test data (controlled by tag, which range from "Train" to "Test"), and return a RDD of TensorSample[Float], which consists of a feature array and a label array. Feature array contains a SparseTensor and a DenseTensor, represents the input of the wide part and the deep part of the model respectively

In [6]:
/**
 * Load data of Census dataset.
 *
 * @param src RDD of file line
 * @param tag "Train" or "Test", represents train data or test data
 * @return
 */
def load(src: RDD[String], tag: String = "Train"): RDD[Sample[Float]] = {

    val iter = if (tag == "Train") src.filter(s => (s.length > 0)).map(_.trim().split(","))
    else src.filter(s => (!s.contains("|1x3 Cross validator") && s.length > 0))
      .map(_.trim().split(","))

    val storage = Storage[Float](11)
    val storageArray = storage.array()
    val results = iter.map(line => {
      val indices = new Array[Int](11)
      val lis = line.map(_.trim()).toSeq
      for (k <- 0 until 5) indices(k) = k
      indices(5) = hashbucket(lis(OCCUPATION), 1000, start = 0) + 5
      indices(6) = hashbucket(lis(NATIVE_COUNTRY), 1000, start = 0) + 1005
      indices(7) = 2005
      indices(8) = hashbucket(lis(EDUCATION) + lis(OCCUPATION), 1000, start = 0) + 2006 // 2006
      indices(9) = hashbucket(
        getAgeboundaries(lis(AGE)).toString + lis(EDUCATION) + lis(OCCUPATION), 1000) + 3006 // 2006
      indices(10) = hashbucket(lis(NATIVE_COUNTRY) + lis(OCCUPATION), 1000) + 4006 // 4006

      // 5006
      storageArray(0) = categoricalFromVocabList(lis(GENDER), gender_vocab, default = -1, start = 0)
      storageArray(1) = categoricalFromVocabList(
        lis(EDUCATION), education_vocab, default = -1, start = 0)
      storageArray(2) = categoricalFromVocabList(
        lis(MARITAL_STATUS), marital_status_vocab, default = -1, start = 0)
      storageArray(3) = categoricalFromVocabList(
        lis(RELATIONSHIP), relationship_vocab, default = -1, start = 0)
      storageArray(4) = categoricalFromVocabList(
        lis(WORKCLASS), workclass_vocab, default = -1, start = 0)

      storageArray(5) = 1
      storageArray(6) = 1
      storageArray(7) = getAgeboundaries(lis(AGE), 0)

      for (k <- 8 until 11) storageArray(k) = 1
      // wide model input
      val sps = Tensor.sparse(Array(indices), storage, Array(5006), 11)
      // deep model input
      val den = Tensor[Float](40).fill(0)
      den.setValue(
        categoricalFromVocabList(lis(WORKCLASS), workclass_vocab, start = 1), 1
      ) // 9
      den.setValue(
        categoricalFromVocabList(lis(EDUCATION), education_vocab, start = 10), 1
      ) // 16
      den.setValue(
        categoricalFromVocabList(lis(GENDER), gender_vocab, start = 26), 1
      ) // 2
      den.setValue(
        categoricalFromVocabList(lis(RELATIONSHIP), relationship_vocab,
          start = 28), 1
      ) // 6
      // total : 33
      den.setValue(34, hashbucket(lis(NATIVE_COUNTRY), 1000, 1).toFloat)
      den.setValue(35, hashbucket(lis(OCCUPATION), 1000, 1).toFloat)
      den.setValue(36, lis(AGE).toFloat)
      den.setValue(37, lis(EDUCATION_NUM).toFloat)
      den.setValue(38, lis(CAPITAL_GAIN).toFloat)
      den.setValue(39, lis(CAPITAL_LOSS).toFloat)
      den.setValue(40, lis(HOURS_PER_WEEK).toFloat)
      den.resize(1, 40)
      val train_label = if (lis(LABEL).contains(">50K")) Tensor[Float](T(2.0f))
                        else Tensor[Float](T(1.0f))
      train_label.resize(1, 1)

      TensorSample[Float](Array(sps, den), Array(train_label))
    })
    results
  }

### Define the Dataset and the Model

Next we define the loaded train dataset and test dataset, as well as the model.

In [7]:
val batchSize = 1200
val trainData = "./census/train.data"
val testData = "./census/test.data"
val trainDataSet = load(sc.textFile(Paths.get(trainData).toString), "Train")
val validateSet = load(sc.textFile(Paths.get(testData).toString), "Test")

val model = SparseWideDeep[Float](modelType = "wide_n_deep", classNum = 2)
println(model)

Sequential[acc0f503]{
  [input -> (1) -> (2) -> (3) -> (4) -> output]
  (1): nn.ParallelTable {
	input
	  |`-> (1): Identity[861afa45]
	   `-> (2): Sequential[7fca3bfc]{
	         [input -> (1) -> (2) -> (3) -> (4) -> (5) -> (6) -> output]
	         (1): Concat[ff0bdcec]{
	           input
	             |`-> (1): Sequential[94b0e990]{
	             |      [input -> (1) -> (2) -> output]
	             |      (1): Narrow[109d8acd](2, 1, 33)
	             |      (2): Reshape[indicator](33)
	             |    }
	             |`-> (2): Sequential[76011249]{
	             |      [input -> (1) -> (2) -> output]
	             |      (1): nn.Select
	             |      (2): LookupTable[embedding_1](1000, 8, 0.0, 1.7976931348623157E308, 2.0)
	             |    }
	             |`-> (3): Sequential[e1a8f3a8]{
	             |      [input -> (1) -> (2) -> output]
	             |      (1): nn.Select
	             |      (2): LookupTable[embedding_2](1000, 8, 0.0, 1.7976931348623157E308, 2.0)
	       

### Define the Optimizer

In this part, we'll define the optimize method and the optimizer, which is one of the key part of model training.
We set the Adam optimizer to optimize our wide and deep model, and set learning rate 0.001, as well as learning rate decay 0.0005.
Then we set the model we want to train, and set the trainDataset we loaded above, then we set criterion, batch size and miniBatch form.

In [8]:
val optimMethod = new Adam[Float](
          learningRate = 0.001,
          learningRateDecay = 0.0005
        )

val optimizer = Optimizer(
        model = model,
        sampleRDD = trainDataSet,
        criterion = ClassNLLCriterion[Float](),
        batchSize = batchSize,
        miniBatch = new SparseTensorMiniBatch[Float](Array(
          Tensor.sparse(Array(5006), 1),
          Tensor(1, 40)),
          Array(Tensor(1, 1)))
      )

### Enable visualization

In this part, we will enable tensorboard visualization in optimizer.
The log files will be write to '/tmp/widedeep/${sc.applicationId}', you can use 'tensorboard --logdir /tmp/widedeep' to start the tensorboard to see the Loss, Top1Accuracy and Parameters' distribution during training.

In [9]:
val logdir = "/tmp/widedeep"
val appName = s"${sc.applicationId}"
val trainSummary = TrainSummary(logdir, appName)
trainSummary.setSummaryTrigger("Parameters", Trigger.severalIteration(10))
val validationSummary = ValidationSummary(logdir, appName)
optimizer.setTrainSummary(trainSummary).setValidationSummary(validationSummary)


com.intel.analytics.bigdl.optim.DistriOptimizer@4faa530d

### Training the Model

After building the model and reading in the dataset, we can train the model using `optimize()` interface.

In [10]:
val trainedModel = optimizer.setOptimMethod(optimMethod).setValidation(Trigger.everyEpoch,
      validateSet, Array(new Top1Accuracy[Float],
        new Loss[Float]()),
      batchSize = batchSize, 
      miniBatch = new SparseTensorMiniBatch[Float](Array(
        Tensor.sparse(Array(5006), 1),
        Tensor(1, 40)),
        Array(Tensor(1, 1)))).setEndWhen(Trigger.maxEpoch(100)).optimize()

can't find locality partition for partition 0 Partition locations are (ArrayBuffer(localhost)) Candidate partition locations are
(0,List()).


### Evaluating the Model

Finally we evaluate the model using the pre-trained model. We set the evaluation information we want to see, like Top1Accuracy and Loss, and set the batch size and miniBatch example.

The output is as follows.

We can see that the accuracy was improved from about `83.6%` using a wide-only linear model to about `85.1%` using a Wide and Deep model. If you'd like to see a working end-to-end example, you can download our example code.

In [11]:
val result = trainedModel.evaluate(validateSet,
   Array(new Top1Accuracy[Float], new Loss[Float]), Some(100), miniBatch = new SparseTensorMiniBatch[Float](Array(Tensor.sparse(Array(5006), 1), Tensor(1, 40)), Array(Tensor(1, 1))))

result.foreach(r => println(s"${r._2} is ${r._1}"))

Top1Accuracy is Accuracy(correct: 13815, count: 16281, accuracy: 0.8485351022664456)
Loss is (Loss: 108.03325, count: 326, Average Loss: 0.33139032)


Note that this tutorial is just a quick example on a small dataset to get you familiar with the API. Wide and Deep Learning will be even more powerful if you try it on a large dataset with many sparse feature columns that have a large number of possible feature values. Again, feel free to take a look at our research paper for more ideas about how to apply Wide and Deep Learning in real-world large-scale machine learning problems.