In [1]:
from pyspark.sql import SparkSession

from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import SGD
from keras.wrappers.scikit_learn import KerasRegressor

from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd
from elephas import optimizers as elephas_optimizers
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler

import pandas as pd

Using Theano backend.


In [2]:
transformer = MinMaxScaler()

train = pd.read_pickle('train_df.p')
test = pd.read_pickle('test_df.p')
test = test.dropna(how='any')

y_train = transformer.fit_transform(train['Target'].values.reshape(-1, 1))
del train['Target']
y_test = transformer.transform(test['Target'].values.reshape(-1, 1))
del test['Target']

In [3]:
spark = SparkSession.builder.appName('ElephasTest').getOrCreate()
rdd = to_simple_rdd(spark.sparkContext, train, y_train)

In [4]:
model = Sequential()
model.add(Dense(18, input_dim=26, init='normal'))
model.add(Activation('relu'))
model.add(Dense(6))
model.add(Activation('relu'))
model.add(Dense(1))
model.add(Activation('sigmoid'))

batch_size = 64
nb_epoch = 10
sgd = SGD(lr=0.1)

adam = elephas_optimizers.adam()
spark_model = SparkModel(spark.sparkContext,
                         model,
                         optimizer=adam,
                         frequency='epoch',
                         mode='asynchronous',
                         master_loss='mse',
                         num_workers=2)

spark_model.train(rdd, nb_epoch=nb_epoch, batch_size=batch_size, verbose=2, validation_split=0.1)

In [9]:
print dir(spark_model.server)
spark_model.server.terminate()

['_Popen', '__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_args', '_authkey', '_bootstrap', '_daemonic', '_identity', '_kwargs', '_name', '_parent_pid', '_popen', '_target', '_tempdir', 'authkey', 'daemon', 'exitcode', 'ident', 'is_alive', 'join', 'name', 'pid', 'run', 'start', 'terminate']


In [10]:
spark_model.train(rdd, nb_epoch=nb_epoch, batch_size=batch_size, verbose=2, validation_split=0.1)

In [44]:
# Evaluate Spark model by evaluating the underlying model
score = spark_model.master_network.evaluate(test.values, y_test, verbose=0)
print('Test accuracy:', score[1])
import numpy as np
result = spark_model.predict(test.values)
# origin_result = y_test
new_array = np.zeros((result.shape[0], 2))
new_array[:, 0] = result[:, 0]
new_array[:, 1] = y_test[:, 0]
new_array

('Test accuracy:', 0.0)


array([[ 0.55673677,  0.82974469],
       [ 0.55506194,  0.82823453],
       [ 0.55502319,  0.85843771],
       [ 0.55358803,  0.84937676],
       [ 0.55682486,  0.81615326],
       [ 0.55743468,  0.78292977],
       [ 0.55042219,  0.73913516],
       [ 0.54554498,  0.69081008],
       [ 0.53971952,  0.62964865],
       [ 0.54708683,  0.6862796 ],
       [ 0.54015797,  0.65607643],
       [ 0.54569781,  0.71044214],
       [ 0.54565787,  0.72101326],
       [ 0.54578596,  0.70138119],
       [ 0.55815291,  0.66966786],
       [ 0.55506915,  0.69081008],
       [ 0.55326217,  0.68174913],
       [ 0.56334382,  0.66664754],
       [ 0.55854541,  0.70591167],
       [ 0.55496639,  0.80105167],
       [ 0.54474694,  0.77386882],
       [ 0.56274587,  0.72554373],
       [ 0.56161028,  0.73007421],
       [ 0.56356007,  0.71497262],
       [ 0.56563115,  0.74215548],
       [ 0.56254303,  0.70742183],
       [ 0.55879158,  0.70591167],
       [ 0.56110966,  0.70440151],
       [ 0.56176692,

In [46]:
spark.stop()

('Test accuracy:', 0.0)


array([[ -7.15427697e-02,   8.29744693e-01],
       [ -7.05455542e-02,   8.28234534e-01],
       [ -7.37894475e-02,   8.58437710e-01],
       [ -6.95443526e-02,   8.49376757e-01],
       [ -7.32552931e-02,   8.16153263e-01],
       [ -6.11683726e-02,   7.82929769e-01],
       [ -5.55935875e-02,   7.39135162e-01],
       [ -4.32104245e-02,   6.90810079e-01],
       [ -4.10445817e-02,   6.29648646e-01],
       [ -2.06356272e-02,   6.86279603e-01],
       [  1.57817136e-02,   6.56076426e-01],
       [  1.20039098e-02,   7.10442144e-01],
       [  3.83645971e-03,   7.21013256e-01],
       [  3.93430609e-03,   7.01381191e-01],
       [ -1.74880940e-02,   6.69667856e-01],
       [ -9.38700512e-03,   6.90810079e-01],
       [ -8.05263408e-04,   6.81749126e-01],
       [  2.09808908e-02,   6.66647538e-01],
       [ -2.52166018e-03,   7.05911668e-01],
       [  2.88960990e-03,   8.01051675e-01],
       [  3.14509938e-03,   7.73868816e-01],
       [  1.31401084e-02,   7.25543733e-01],
       [  

In [15]:
result.shape

(245, 1)

In [7]:
df = pd.read_csv('/Users/warn/Documents/temp/housing.data', delim_whitespace=True, header=None)
dataset = df.values
X = dataset[:,0:13]
Y = dataset[:,13]

In [5]:
# define base mode
def baseline_model():
	# create model
	model = Sequential()
	model.add(Dense(13, input_dim=13, init='normal', activation='relu'))
	model.add(Dense(1, init='normal'))
	# Compile model
	model.compile(loss='mean_squared_error', optimizer='adam')
	return model

In [8]:
estimator = KerasRegressor(build_fn=baseline_model, nb_epoch=100, batch_size=5, verbose=0)
kfold = KFold(n_splits=10, random_state=7)
results = cross_val_score(estimator, X, Y, cv=kfold)
print("Results: %.2f (%.2f) MSE" % (results.mean(), results.std()))

Results: 34.22 (35.40) MSE


In [50]:
df = spark.read.load('/Users/warn/PycharmProjects/Dissertation/src/stockforecaster/test.parquet')

In [None]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, MultilayerPerceptronClassifier

In [13]:
spark.stop()