<img src="./mbit-logo.png" align="right" style="float" width="55">
<font color="#CA3532"><h1 align="left">Ejercicio Spark BigDL</h1></font>
<font color="#CA3532"><h2 align="left">Máster en Big Data, Cloud & Analytics 2019-2020</h2></font>
<font color="#6E6E6E"><h4 align="left">Carlos Alfonsel <a> carlos.alfonsel@mbitschool.com </a> </h4></font> 

# Predicción del precio de casas de Boston

Intentaremos predecir el precio mediano de las viviendas en un barrio de Boston a mediados de la década de 1970, partiendo de una variedad de datos sobre el el barrio en el momento, como la tasa de criminalidad, la tasa de impuesto a la propiedad, etc.

El conjunto de datos que usaremos es pequeña, 506 muestras en total. Debemos estar atentos, ya que hay valores discretos y continuos (como la criminalidad).

**Aunque la solución planteada es con AnalyticsZoo, también se puede utilizar BigDL.**

Los datos contienen 13 características a partir de las cuales deberíamos ser capaces de poder predecir el precio de cada vivienda. Las características son:

1. Ratio de crimen per cápita.

2. Proporción de tierra residencial dividida en zonas de más de 25.000 pies cuadrados.
3. Proporción de acres de negocios no minoristas por ciudad.
4. Variable ficticia de Charles River (= 1 si limita con el río; 0 en caso contrario).
5. Concentración de óxidos nítricos (partes por 10 millones).
6. Número promedio de habitaciones por vivienda.
7. Proporción de unidades ocupadas por sus propietarios y construídas antes de 1940.
8. Distancias ponderadas a cinco centros de empleo de Boston.
9. Índice de accesibilidad a carreteras radiales.
10. Tasa de impuesto a la propiedad de valor total por $10,000.
11. Relación alumno-profesor por localidad.
12. 1000 * (Bk - 0.63) ** 2 donde Bk es la proporción de personas negras por pueblo.
13. % de población de un estatus bajo.

In [1]:
#! pip install analytics-zoo
#! sudo apt install openjdk-8-jdk
#! echo "2" | sudo update-alternatives --config java

In [2]:
from zoo.common.nncontext import *
sc_zoo = init_nncontext(init_spark_conf().setMaster("local[4]"))

Prepending /home/osboxes/anaconda3/envs/env_bigdl/lib/python2.7/site-packages/bigdl/share/conf/spark-bigdl.conf to sys.path
Adding /home/osboxes/anaconda3/envs/env_bigdl/lib/python2.7/site-packages/zoo/share/lib/analytics-zoo-bigdl_0.10.0-spark_2.4.3-0.8.1-jar-with-dependencies.jar to BIGDL_JARS
Prepending /home/osboxes/anaconda3/envs/env_bigdl/lib/python2.7/site-packages/zoo/share/conf/spark-analytics-zoo.conf to sys.path
Adding /home/osboxes/anaconda3/envs/env_bigdl/lib/python2.7/site-packages/zoo/share/lib/analytics-zoo-bigdl_0.10.0-spark_2.4.3-0.8.1-jar-with-dependencies.jar to SPARK_CLASSPATH


In [3]:
from zoo.pipeline.api.keras.datasets import boston_housing
(train_data, train_targets), (test_data, test_targets) =  boston_housing.load_data()

In [4]:
#! pip install BigDL
#! echo "y" | pip uninstall pyspark
#! pip install pyspark==2.1.2

In [5]:
import matplotlib
matplotlib.use('Agg')
%pylab inline
import pandas
import datetime as dt

from bigdl.nn.layer            import *
from bigdl.nn.criterion        import *
from bigdl.optim.optimizer     import *
from bigdl.util.common         import *
from bigdl.util.common         import Sample
from bigdl.dataset.transformer import *

from matplotlib.pyplot         import imshow
import matplotlib.pyplot       as plt

from pyspark.conf              import SparkConf
from pyspark.sql               import SparkSession

conf = SparkConf().setMaster("local[4]") \
                  .set("spark.driver.memory","2g") \
                  .set("spark.shuffle.reduceLocality.enabled", "false") \
                  .set("spark.shuffle.blockTransferService", "nio") \
                  .set("spark.scheduler.minRegisteredResourcesRatio", "1.0") \
                  .set("spark.speculation", "false")

sc = SparkSession.builder.config(conf = conf).getOrCreate().sparkContext

init_engine()

Populating the interactive namespace from numpy and matplotlib


In [6]:
print(train_data.shape)
print(test_data.shape)

(404, 13)
(102, 13)


In [7]:
train = Sample.from_ndarray(train_data, train_targets)
test  = Sample.from_ndarray(test_data , test_targets)

In [8]:
#rdd_train = sc.parallelize(range(train_data.shape[0])).map(lambda x: train[x])
#for row in rdd_train.collect(): print(row) #NO FUNCIONA CON ESTE TIPO DE VARIABLE (PipelineRDD)

rdd_train = sc.parallelize(range(train_data.shape[0])).map(lambda x: 
                                                           Sample.from_ndarray(train_data   [x], 
                                                                               train_targets[x]))

rdd_test  = sc.parallelize(range(test_data.shape [0])).map(lambda x: 
                                                          Sample.from_ndarray(test_data    [x], 
                                                                               test_targets[x]))


In [9]:
rdd_train

PythonRDD[2] at RDD at PythonRDD.scala:48

In [10]:
print rdd_train.take(5)

[Sample: features: [JTensor: storage: [6.7240e-02 0.0000e+00 3.2400e+00 0.0000e+00 4.6000e-01 6.3330e+00
 1.7200e+01 5.2146e+00 4.0000e+00 4.3000e+02 1.6900e+01 3.7521e+02
 7.3400e+00], shape: [13], float], labels: [JTensor: storage: [22.6], shape: [1], float], Sample: features: [JTensor: storage: [9.2323e+00 0.0000e+00 1.8100e+01 0.0000e+00 6.3100e-01 6.2160e+00
 1.0000e+02 1.1691e+00 2.4000e+01 6.6600e+02 2.0200e+01 3.6615e+02
 9.5300e+00], shape: [13], float], labels: [JTensor: storage: [50.], shape: [1], float], Sample: features: [JTensor: storage: [1.1425e-01 0.0000e+00 1.3890e+01 1.0000e+00 5.5000e-01 6.3730e+00
 9.2400e+01 3.3633e+00 5.0000e+00 2.7600e+02 1.6400e+01 3.9374e+02
 1.0500e+01], shape: [13], float], labels: [JTensor: storage: [23.], shape: [1], float], Sample: features: [JTensor: storage: [ 24.8017   0.      18.1      0.       0.693    5.349   96.       1.7028
  24.     666.      20.2    396.9     19.77  ], shape: [13], float], labels: [JTensor: storage: [8.3], shape

In [11]:
# Hyperparameters:
learning_rate   = 0.2
training_epochs = 15
batch_size      = 8
n_input         = train_data.shape[1]
n_output        = 1

# Network Parameters:
n_hidden_1 = n_input * 2 # 1st layer number of features
n_hidden_2 = n_input * 2 # 2nd layer number of features

In [12]:
# Linear Regression Model:
def linear_regression(n_input, n_output):
    model = Sequential()  
    model.add(Linear(n_input, n_output))
    return model

#model = linear_regression(n_input, n_output)

# MLP Model:
def multilayer_perceptron(n_hidden_1, n_hidden_2, n_input, n_output):
    # Initialize a sequential container:
    model = Sequential()
    # Hidden layer with ReLu activation:
    model.add(Linear(n_input   , n_hidden_1).set_name('mlp_fc1'))
    model.add(ReLU())
    # Hidden layer with ReLu activation:
    model.add(Linear(n_hidden_1, n_hidden_2).set_name('mlp_fc2'))
    model.add(ReLU())
    # Output layer:
    model.add(Linear(n_hidden_2, n_output  ).set_name('mlp_fc3'))
    model.add(LogSoftMax())
    return model

model = multilayer_perceptron(n_hidden_1, n_hidden_2, n_input, n_output)

creating: createSequential
creating: createLinear
creating: createReLU
creating: createLinear
creating: createReLU
creating: createLinear
creating: createLogSoftMax


In [13]:
# Optimizer:

criterion_1 = MSECriterion()
criterion_2 = AbsCriterion()
criterion_3 = SmoothL1Criterion()
criterion_4 = BCECriterion()

optimizer   = Optimizer(model        = model,
                        training_rdd = rdd_train,
                        criterion    = criterion_4,
                        optim_method = SGD(learningrate = learning_rate),
                        end_trigger  = MaxEpoch(training_epochs),
                        batch_size   = batch_size)

# bigdl.nn.criterion:
# https://bigdl-project.github.io/0.2.0/APIdocs/python-api-doc/_modules/bigdl/nn/criterion.html


creating: createMSECriterion
creating: createAbsCriterion
creating: createSmoothL1Criterion
creating: createBCECriterion
creating: createDefault
creating: createSGD
creating: createMaxEpoch
creating: createDistriOptimizer


In [14]:
# Training model:
trained_model = optimizer.optimize()

In [15]:
# Printing the first 10 predicted results of training data:
predict_result = trained_model.predict(rdd_train)

p = predict_result.take(5)
print("predict: \n")
for i in p: print(str(i) + "\n")

predict: 

[0.]

[0.]

[0.]

[0.]

[0.]



In [16]:
predict_data = sc.parallelize(range(test_data.shape[0])).map(lambda x: 
                                                             Sample.from_ndarray(test_data   [x],
                                                                                 test_targets[x]))
print predict_data.take(5)

[Sample: features: [JTensor: storage: [3.7380e-02 0.0000e+00 5.1900e+00 0.0000e+00 5.1500e-01 6.3100e+00
 3.8500e+01 6.4584e+00 5.0000e+00 2.2400e+02 2.0200e+01 3.8940e+02
 6.7500e+00], shape: [13], float], labels: [JTensor: storage: [20.7], shape: [1], float], Sample: features: [JTensor: storage: [6.5880e-02 0.0000e+00 2.4600e+00 0.0000e+00 4.8800e-01 7.7650e+00
 8.3300e+01 2.7410e+00 3.0000e+00 1.9300e+02 1.7800e+01 3.9556e+02
 7.5600e+00], shape: [13], float], labels: [JTensor: storage: [39.8], shape: [1], float], Sample: features: [JTensor: storage: [  8.98296   0.       18.1       1.        0.77      6.212    97.4
   2.1222   24.      666.       20.2     377.73     17.6    ], shape: [13], float], labels: [JTensor: storage: [17.8], shape: [1], float], Sample: features: [JTensor: storage: [  1.19294   0.       21.89      0.        0.624     6.326    97.7
   2.271     4.      437.       21.2     396.9      12.26   ], shape: [13], float], labels: [JTensor: storage: [19.6], shape: [1],

In [17]:
# Testing the model based on MSE (Mean Squared Error):
def test_predict(trained_model):
    total_length = test_data.shape[0]
    features     = test_data
    labels       = test_targets
    predict_data = sc.parallelize(range(total_length)).map(lambda x: 
                                                           Sample.from_ndarray(features[x], 
                                                                               labels  [x]))
    
    predict_result = trained_model.predict(predict_data)
    
    p   = sum(predict_result.collect())
    mse = ((p - sum(labels)) ** 2) / total_length
    mae = abs((p - sum(labels)) / total_length)
    print("Mean Squared Error: ", mse)
    print("Mean Absolute Error:", mae)

In [18]:
test_predict(trained_model)

('Mean Squared Error: ', 52736.789117647044)
('Mean Absolute Error:', 22.738235294117644)
