# Installing necessary libraries

In [1]:
# !pip install tensorflow
# !pip install pandas
# !pip install elephas 

# Getting data from HDFS

In [2]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType

import numpy as np
import pandas as pd 

spark = SparkSession.builder.master("local").appName("hdfs_test").getOrCreate()
core_site = "hdfs://localhost:9000/"
data_fold = "aabs-output/data_train.csv"
MODEL_PATH = "/home/hdoop/kaggle_prod/prod_quality_predict_model.h5"
TRAIN_CSV_PATH = "/home/hdoop/kaggle_prod/prod_train.csv"
TEST_CSV_PATH = "/home/hdoop/kaggle_prod/prod_test.csv"

prod_schema = ["T_data_1_1", "T_data_1_2","T_data_1_3", 
               "T_data_2_1", "T_data_2_2", "T_data_2_3", 
               "T_data_3_1", "T_data_3_2", "T_data_3_3", 
               "T_data_4_1", "T_data_4_2", "T_data_4_3", 
               "T_data_5_1", "T_data_5_2", "T_data_5_3",
               "H_data", "AH_data", "quality" ]


prod_data=spark.read.csv(core_site+data_fold).toPandas()
prod_data = pd.DataFrame( prod_data._c0.str.rstrip().str.split(" ").to_list(), columns=prod_schema ).astype(float)
x = prod_data.drop(["quality"],axis=1)
y = prod_data["quality"]

spark.stop()

# Preprocessing the data to standardize & test-train split

In [3]:
from sklearn.preprocessing import StandardScaler 
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score 

x_train,x_test,y_train,y_test = train_test_split(x,y,train_size=0.8,random_state=42)

scaler = StandardScaler() 
scaler.fit(x_train) 
x_std_train = scaler.transform(x_train) 
x_test_std = scaler.transform(x_test)

# Training the model using Elephas api

In [None]:
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.layers import Input,Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import RMSprop
    
from elephas.utils.rdd_utils import to_simple_rdd
from elephas.spark_model import SparkModel
from pyspark import SparkContext, SparkConf
    
def create_model():
    layer_neurons = [17,24,34]
    model = Sequential() 
    model.add( Input(shape=(17)) )

    for n in layer_neurons:
        model.add( Dense(n, activation='relu') )

    model.add( Dense(1, activation='relu') )

    model.compile(loss='mean_squared_error',
                  optimizer='adam')

    return model

model = create_model()

conf = SparkConf().setAppName('Elephas_App').setMaster('local[8]')
sc = SparkContext(conf=conf)

rdd = to_simple_rdd(sc, x_train, y_train)


spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)
spark_model.save(MODEL_PATH)

sc.stop()

# Predicting on the test data

In [5]:
saved_model = tf.keras.models.load_model(MODEL_PATH)
saved_model.evaluate(x_test_std,y_test) 
y_pred_test = saved_model.predict(x_test_std) 
print('R2 score:',r2_score( y_test, y_pred_test ) )

R2 score: 0.8649340801331773


# Saving the train & test data for predictions

In [6]:
prod_train_df = pd.concat( [x_train, y_train],axis=1 )
prod_test_df = pd.concat( [x_test.reset_index(drop=True), y_test.reset_index(drop=True), pd.DataFrame(y_pred_test,columns=['quality_preds'])],axis=1 )
prod_train_df.to_csv(TRAIN_CSV_PATH)
prod_test_df.to_csv(TEST_CSV_PATH)