First of all, set environment variables and initialize spark context:

In [None]:
%env SPARK_DRIVER_MEMORY=8g
%env PYSPARK_PYTHON=/usr/bin/python3.5
%env PYSPARK_DRIVER_PYTHON=/usr/bin/python3.5

from zoo.common.nncontext import *
sc = init_nncontext(init_spark_conf().setMaster("local[4]"))

# Regression
We will be attempting to predict the median price of homes in a given Boston suburb in the mid-1970s, given a few data points about the suburb at the time, such as the crime rate, the local property tax rate, etc.

The dataset we will be using has another interesting difference from our two previous examples: it has very few data points, only 506 in total, split between 404 training samples and 102 test samples, and each "feature" in the input data (e.g. the crime rate is a feature) has a different scale. For instance some values are proportions, which take a values between 0 and 1, others take values between 1 and 12, others between 0 and 100...

This dataset is packaged in Keras 2.0.8 but not in Keras 1.2.2, so that we need to use following code to get the data, then we also apply normalization on these data:

In [None]:
from keras.utils.data_utils import get_file
def load_data(path='boston_housing.npz', test_split=0.2, seed=113):
    """Loads the Boston Housing dataset.
    # Arguments
        path: path where to cache the dataset locally
            (relative to ~/.zoo.pipeline.api.keras/datasets).
        test_split: fraction of the data to reserve as test set.
        seed: Random seed for shuffling the data
            before computing the test split.
    # Returns
        Tuple of Numpy arrays: `(x_train, y_train), (x_test, y_test)`.
    """
    assert 0 <= test_split < 1
    path = get_file(
        path,
        origin='https://s3.amazonaws.com/zoo.pipeline.api.keras-datasets/boston_housing.npz'
        )
    with np.load(path) as f:
        x = f['x']
        y = f['y']

    np.random.seed(seed)
    indices = np.arange(len(x))
    np.random.shuffle(indices)
    x = x[indices]
    y = y[indices]

    x_train = np.array(x[:int(len(x) * (1 - test_split))])
    y_train = np.array(y[:int(len(x) * (1 - test_split))])
    x_test = np.array(x[int(len(x) * (1 - test_split)):])
    y_test = np.array(y[int(len(x) * (1 - test_split)):])
    return (x_train, y_train), (x_test, y_test)

(train_data, train_targets), (test_data, test_targets) = load_data()

mean = train_data.mean(axis=0)
train_data -= mean
std = train_data.std(axis=0)
train_data /= std

test_data -= mean
test_data /= std

In this example we have so few data points, the validation set would end up being very small (e.g. about 100 examples). A consequence is that our validation scores may change a lot depending on which data points we choose to use for validation and which we choose for training, i.e. the validation scores may have a high variance with regard to the validation split. This would prevent us from reliably evaluating our model.

The best practice in such situations is to use K-fold cross-validation. It consists of splitting the available data into K partitions (typically K=4 or 5), then instantiating K identical models, and training each one on K-1 partitions while evaluating on the remaining partition. The validation score for the model used would then be the average of the K validation scores obtained.

Since we are using K-fold so that we have to build the model multiple times, we use following function to build our model:

In [None]:
from zoo.pipeline.api.keras import models
from zoo.pipeline.api.keras import layers

def build_model():
    # Because we will need to instantiate
    # the same model multiple times,
    # we use a function to construct it.
    model = models.Sequential()
    model.add(layers.Dense(64, activation='relu',
                           input_shape=(train_data.shape[1],)))
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(1))
    model.compile(optimizer='rmsprop', loss='mse', metrics=['mae'])
    return model

Then let's start our training:

In [None]:
import numpy as np

k = 4
num_val_samples = len(train_data) // k
num_nb_epoch = 50
all_scores = []
for i in range(k):
    print('processing fold #', i)
    # Prepare the validation data: data from partition # k
    val_data = train_data[i * num_val_samples: (i + 1) * num_val_samples]
    val_targets = train_targets[i * num_val_samples: (i + 1) * num_val_samples]

    # Prepare the training data: data from all other partitions
    partial_train_data = np.concatenate(
        [train_data[:i * num_val_samples],
         train_data[(i + 1) * num_val_samples:]],
        axis=0)
    partial_train_targets = np.concatenate(
        [train_targets[:i * num_val_samples],
         train_targets[(i + 1) * num_val_samples:]],
        axis=0)

    # Build the Keras model (already compiled)
    model = build_model()
    # Train the model (in silent mode, verbose=0)
    #model.fit(partial_train_data, partial_train_targets,
    #          nb_epoch=num_nb_epoch, batch_size=1, verbose=0)
    model.fit(partial_train_data, partial_train_targets,
              nb_epoch=num_nb_epoch, batch_size=16)

    # Evaluate the model on the validation data
    #val_mse, val_mae = model.evaluate(val_data, val_targets, verbose=0)
    val_mae = model.evaluate(val_data, val_targets)
    all_scores.append(val_mae[0].result)

processing fold # 0
Trained 16 records in 0.011235845 seconds. Throughput is 1424.0139 records/second. Loss is 8.708786.
processing fold # 1
Trained 16 records in 0.009535034 seconds. Throughput is 1678.0223 records/second. Loss is 5.3613434.
processing fold # 2
Trained 16 records in 0.008636178 seconds. Throughput is 1852.6713 records/second. Loss is 18.106756.
processing fold # 3
Trained 16 records in 0.009207628 seconds. Throughput is 1737.6897 records/second. Loss is 7.0931993.

Then we could check our K-fold training result:

In [None]:
print (all_scores)

[20.572654724121094, 19.606250762939453, 21.224998474121094, 22.60078239440918]