In [1]:
from pyspark.sql.functions import col
from pyspark.sql.functions import explode
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import IndexToString
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
from pypio import pypio

In [3]:
pypio.init()

In [4]:
event_df = pypio.find('BHPApp')

In [5]:
event_df.show(5)

+--------------------+-----+----------+--------+----------------+--------------+--------------------+----+----+--------------------+--------------------+
|             eventId|event|entityType|entityId|targetEntityType|targetEntityId|           eventTime|tags|prId|        creationTime|              fields|
+--------------------+-----+----------+--------+----------------+--------------+--------------------+----+----+--------------------+--------------------+
|AB_QOXJybhjjJCYSL...| $set|      user|     316|            null|          null|2018-08-30 07:34:...|    |null|2018-08-30 07:34:...|Map(MEDV -> 17.8,...|
|AEtI9wjvz8pi5DM8U...| $set|      user|     459|            null|          null|2018-08-30 07:34:...|    |null|2018-08-30 07:34:...|Map(MEDV -> 20.0,...|
|AH2_kZ-XQS5xnm6a0...| $set|      user|     357|            null|          null|2018-08-30 07:34:...|    |null|2018-08-30 07:34:...|Map(MEDV -> 21.7,...|
|AsI81-tbLNnsuRqyk...| $set|      user|     164|            null|          n

In [6]:
def get_field_type(name):
    return 'double'

field_names = (event_df
            .select(explode("fields"))
            .select("key")
            .distinct()
            .rdd.flatMap(lambda x: x)
            .collect())
field_names.sort()
exprs = [col("fields").getItem(k).cast(get_field_type(k)).alias(k) for k in field_names]
data_df = event_df.select(*exprs)
data_df = data_df.withColumnRenamed("MEDV", "label")

In [7]:
data_df.show(5)

+----+------+----+------------------+------------------+-----+-----+-----+-----+-------+----+-----------------+-----+---+
| AGE|     B|CHAS|              CRIM|               DIS|INDUS|LSTAT|label|  NOX|PTRATIO| RAD|               RM|  TAX| ZN|
+----+------+----+------------------+------------------+-----+-----+-----+-----+-------+----+-----------------+-----+---+
|83.2| 390.7| 0.0|           0.31827|            3.9986|  9.9|18.33| 17.8|0.544|   18.4| 4.0|            5.914|304.0|0.0|
|84.4| 396.9| 0.0|           6.80117|            2.7175| 18.1| 14.7| 20.0|0.713|   20.2|24.0|            6.081|666.0|0.0|
|91.0|391.34| 1.0|            3.8497|            2.5052| 18.1|13.27| 21.7| 0.77|   20.2|24.0|            6.395|666.0|0.0|
|91.8|395.11| 0.0|2.2423599999999997|2.4219999999999997|19.58|11.64| 22.7|0.605|   14.7| 5.0|            5.854|403.0|0.0|
|56.7| 396.9| 0.0|           13.0751|            2.8237| 18.1|14.76| 20.1| 0.58|   20.2|24.0|5.712999999999999|666.0|0.0|
+----+------+----+------

In [8]:
(train_df, test_df) = data_df.randomSplit([0.9, 0.1])

In [9]:
featureAssembler = VectorAssembler(inputCols=[x for x in field_names if x != 'MEDV'],
                                   outputCol="rawFeatures")
scaler = StandardScaler(inputCol="rawFeatures", outputCol="features")
clf = RandomForestRegressor(featuresCol="features", labelCol="label", predictionCol="prediction",
                            maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
                            maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10,
                            impurity="variance", subsamplingRate=1.0, seed=None, numTrees=20,
                            featureSubsetStrategy="auto")
pipeline = Pipeline(stages=[featureAssembler, scaler, clf])

In [10]:
model = pipeline.fit(train_df)

In [11]:
predict_df = model.transform(test_df)

In [12]:
predict_df.select("prediction", "label").show(5)

+------------------+-----+
|        prediction|label|
+------------------+-----+
| 24.65159820107046| 25.3|
|25.123165807614374| 23.7|
|25.525293402054672| 26.2|
| 33.11138915788179| 33.1|
| 21.70422839592778| 22.6|
+------------------+-----+
only showing top 5 rows



In [13]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predict_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 2.81152
