In [1]:
from keras.layers import Input, Dense, Embedding, Flatten, Dropout, Activation, Reshape
from keras.layers.merge import Concatenate
from keras.models import Model, Sequential
from sklearn.preprocessing import OneHotEncoder 
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.models import model_from_json
from keras.utils import np_utils, generic_utils
#from keras.utils.generic_utils import get_from_module


In [2]:
from pyspark.sql.functions import countDistinct
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.mllib.evaluation import MulticlassMetrics

In [3]:
import elephas

In [4]:
from elephas import optimizers as elephas_optimizers
#from elephas.ml_model import ElephasEstimator

In [5]:
from elephas.ml_model import ElephasEstimator
from elephas.spark_model import SparkModel
from pyspark.sql.types import IntegerType, FloatType

In [6]:
attribute_database="sub_boi_events_2018_attribute"
encoded_database = "sub_boi_events_2018_flatten_encoded"
flatten_database = "sub_boi_events_2018_flatten"
encoded_database_val = encoded_database + "_val"
encoded_database_test = encoded_database + "_test"

In [7]:
eventData = spark.sql("select event, verb from " + encoded_database)

In [8]:
eventData = eventData.withColumn("features", eventData['event'])

In [9]:
def create_model():
  input_dim = len(eventData.select("features").first()[0])
  nb_classes = eventData.select("verb").distinct().count()
  '''
  input_model = Input(shape=(inputDim,))
  h2 = Dense(50, activation='relu') (input_model)
  h3 = Dense(20, activation='linear')(h2)
  output = Dense(outputDim, activation='sigmoid', name='prediction_output')(h3)

  model = Model(inputs=input_model, outputs=output)
  '''
  model = Sequential()
  model.add(Dense(512, input_shape=(input_dim,)))
  model.add(Activation('relu'))
  model.add(Dropout(0.5))
  model.add(Dense(512))
  model.add(Activation('relu'))
  model.add(Dropout(0.5))
  model.add(Dense(512))
  model.add(Activation('relu'))
  model.add(Dropout(0.5))
  model.add(Dense(nb_classes))
  model.add(Activation('softmax'))
  model.compile(loss='categorical_crossentropy', optimizer='adadelta', metrics=['accuracy'])

  #model.compile(loss='categorical_crossentropy', optimizer='adam')
  
  return model, outputDim

In [10]:
from keras import optimizers
sgd = optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
sgd_conf = optimizers.serialize(sgd)

In [11]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
fitted_scaler = scaler.fit(eventData)
scaled_df = fitted_scaler.transform(eventData)

In [12]:
model, outputDim = create_model()
adadelta = elephas_optimizers.Adadelta(lr=1.0, rho=0.95, epsilon=1e-06)

estimator = ElephasEstimator()
estimator.setFeaturesCol("event")
estimator.setLabelCol("verb")
estimator.set_keras_model_config(model.to_yaml())       # Provide serialized Keras model
estimator.set_optimizer_config(sgd_conf)   # Provide serialized Elephas optimizer
estimator.set_categorical_labels(True)
estimator.set_nb_classes(outputDim)
estimator.set_num_workers(1)  # We just use one worker here. Feel free to adapt it.
estimator.set_epochs(20)
estimator.set_batch_size(128)
estimator.set_verbosity(1)
estimator.set_validation_split(0.15)
estimator.set_mode("synchronous")
estimator.set_loss('categorical_crossentropy')

In [13]:
pipeline = Pipeline(stages=[scaler, estimator])

In [14]:
fit_model = pipeline.fit(eventData)

In [15]:
prediction = fit_model.transform(eventData) # Evaluate on train dat

In [16]:
#display(prediction.select("features"))

In [17]:
pnl = prediction.select("verb", "prediction")
#pnl.show(100)

prediction_and_label = pnl.rdd.map(lambda row: (row.verb, row.prediction))
metrics = MulticlassMetrics(prediction_and_label)
print(metrics.precision())