Document: [PySpark API](https://spark.apache.org/docs/latest/api/python/index.html)

In [None]:
%matplotlib inline

In [None]:
import pypio

from pyspark.sql.functions import col
from pyspark.sql.functions import explode
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import IndexToString
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


## Initialize PIO

In [None]:
pypio.init()
# tasks in init(event_server='localhost:7070', app_name=None) are as below:
# 1. Check if Event Server runs (localhost:7070)
# 1a. If not running: pio eventserver
# 2. Check if app is created
# 2a. If app does not exist: pio app new ...
# 3. Create SparkSession and somethings...

## Import Data from File

In [None]:
pypio.import_file('https://raw.githubusercontent.com/scikit-learn/scikit-learn/0.19.0/sklearn/datasets/data/boston_house_prices.csv')

## Load Data from PIO

In [None]:
event_df = p_event_store.find('BHPApp')

In [None]:
event_df.show(5)

## Preprocessing

In [None]:
def get_field_type(name):
    return 'double'

field_names = (event_df
            .select(explode("fields"))
            .select("key")
            .distinct()
            .rdd.flatMap(lambda x: x)
            .collect())
field_names.sort()
exprs = [col("fields").getItem(k).cast(get_field_type(k)).alias(k) for k in field_names]
data_df = event_df.select(*exprs)
data_df = data_df.withColumnRenamed("MEDV", "label")

In [None]:
data_df.show(5)

## Pandas (EDA...)

In [None]:
p_data_df = data_df.toPandas()

In [None]:
import matplotlib.pyplot as plt
from pandas.plotting import scatter_matrix
scatter_matrix(p_data_df, diagonal='kde', color='k', alpha=0.3)

plt.show()

## Train

In [None]:
(train_df, test_df) = data_df.randomSplit([0.9, 0.1])


In [None]:
featureAssembler = VectorAssembler(inputCols=[x for x in field_names if x != 'MEDV'],
                                   outputCol="rawFeatures")
scaler = StandardScaler(inputCol="rawFeatures", outputCol="features")
# TODO NPE
# clf = DecisionTreeRegressor(featuresCol="features", labelCol="label", predictionCol="prediction",
#                             maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
#                             maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10,
#                             impurity="variance", seed=None, varianceCol=None)
# clf = DecisionTreeRegressor()
clf = RandomForestRegressor(featuresCol="features", labelCol="label", predictionCol="prediction",
                            maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
                            maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10,
                            impurity="variance", subsamplingRate=1.0, seed=None, numTrees=20,
                            featureSubsetStrategy="auto")
# TODO NPE
# clf = LinearRegression(featuresCol="features", labelCol="label", predictionCol="prediction",
#                        maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
#                        standardization=True, solver="auto", weightCol=None, aggregationDepth=2)
# clf = LinearRegression()
# clf = GBTRegressor(featuresCol="features", labelCol="label", predictionCol="prediction",
#                    maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256,
#                    cacheNodeIds=False, subsamplingRate=1.0, checkpointInterval=10, lossType="squared",
#                    maxIter=20, stepSize=0.1, seed=None
# TODO NPE
# clf = GeneralizedLinearRegression(labelCol="label", featuresCol="features", predictionCol="prediction",
#                                   family="gaussian", link=None, fitIntercept=True, maxIter=25, tol=1e-6,
#                                   regParam=0.0, weightCol=None, solver="irls", linkPredictionCol=None)
# clf = GeneralizedLinearRegression()
pipeline = Pipeline(stages=[featureAssembler, scaler, clf])


In [None]:
model = pipeline.fit(train_df)


## Evaluation

In [None]:
predict_df = model.transform(test_df)


In [None]:
predict_df.select("prediction", "label").show(5)


In [None]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predict_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


## Save

In [None]:
pypio.save_model(model)