In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GBTRegressor, RandomForestRegressor
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SQLContext

In [2]:
path = "file:///home/cloudera/Documents/cw/measures_v2.csv"

sqlContext = SQLContext(sc)

df = sqlContext.read.load(path, 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')

In [3]:
train, test = df.randomSplit([0.8, 0.2],seed=2022)

In [4]:
label = 'pm'
# num_features = ['u_q', 'coolant', 'stator_winding', 'stator_tooth', 'motor_speed', 'i_d', 'i_q', 'stator_yoke', 'ambient']
num_features = ['u_q', 'coolant', 'stator_winding', 'u_d', 'stator_tooth', 'motor_speed', 'i_d', 'i_q', 'stator_yoke', 'ambient', 'torque']

In [5]:
stringIndexer = StringIndexer(inputCol='profile_id', outputCol='profile_strIndex')

oneHotEncoder = OneHotEncoder(dropLast=False, inputCol='profile_strIndex', outputCol='profile_oneHot')

vectorAssembler = VectorAssembler(inputCols=num_features+['profile_oneHot'], outputCol="features")

standardScaler = StandardScaler(inputCol="features", outputCol="features_norm")

regression = GBTRegressor(featuresCol="features_norm", labelCol=label)

In [6]:
pipeline = Pipeline(stages=[stringIndexer, oneHotEncoder, vectorAssembler, standardScaler, regression])

In [7]:
model = pipeline.fit(train)

In [8]:
preds = model.transform(test)

In [9]:
evaluator = RegressionEvaluator(labelCol=label, predictionCol='prediction')

r2 = evaluator.evaluate(preds, {evaluator.metricName: "r2"})
rmse = evaluator.evaluate(preds, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(preds, {evaluator.metricName: "mae"})

print("RSquare: %.4f \nRMSE: %.4f \nMAE: %.4f" % (r2, rmse, mae))

RSquare: 0.9056 
RMSE: 5.8359 
MAE: 4.3507
