In [1]:
import warnings
warnings.filterwarnings("ignore")

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, GeneralizedLinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.datasets import fetch_california_housing

# Create a SparkSession
spark = SparkSession.builder \
    .appName("RegressionPipelineExample") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

housing = fetch_california_housing()
import pandas as pd
pd.DataFrame.iteritems = pd.DataFrame.items
df = pd.DataFrame(data=housing.data,columns=housing.feature_names)
df['label'] = housing.target
df_data = spark.createDataFrame(df)
df_data.show(5) 

+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|label|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+
|8.3252|    41.0| 6.984126984126984|1.0238095238095237|     322.0|2.5555555555555554|   37.88|  -122.23|4.526|
|8.3014|    21.0| 6.238137082601054|0.9718804920913884|    2401.0| 2.109841827768014|   37.86|  -122.22|3.585|
|7.2574|    52.0| 8.288135593220339| 1.073446327683616|     496.0|2.8022598870056497|   37.85|  -122.24|3.521|
|5.6431|    52.0|5.8173515981735155|1.0730593607305936|     558.0| 2.547945205479452|   37.85|  -122.25|3.413|
|3.8462|    52.0| 6.281853281853282|1.0810810810810811|     565.0|2.1814671814671813|   37.85|  -122.25|3.422|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+
o

In [5]:
# Prepare the features column
feature_cols = df_data.columns[:-1]  # Assuming the last column is the label
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_data = assembler.transform(df_data)
df_data.show(5,100)

+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+-----------------------------------------------------------------------------------------+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|label|                                                                                 features|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+-----------------------------------------------------------------------------------------+
|8.3252|    41.0| 6.984126984126984|1.0238095238095237|     322.0|2.5555555555555554|   37.88|  -122.23|4.526|[8.3252,41.0,6.984126984126984,1.0238095238095237,322.0,2.5555555555555554,37.88,-122.23]|
|8.3014|    21.0| 6.238137082601054|0.9718804920913884|    2401.0| 2.109841827768014|   37.86|  -122.22|3.585|[8.3014,21.0,6.238137082601054,0.9718804920913884,2401.0,2.109841827768014,37.86,-122.

In [6]:
# Scale and normalize features
scaler = StandardScaler(inputCol="raw_features", outputCol="scaled_features", withStd=True, withMean=True)

In [7]:
# Split the dataset into training and testing sets
(trainingData, testData) = df_data.randomSplit([0.8, 0.2])

In [8]:
# Initialize regression models
lr = LinearRegression(featuresCol="features", labelCol="label")
glr = GeneralizedLinearRegression(featuresCol="features", labelCol="label")
dt = DecisionTreeRegressor(featuresCol="features", labelCol="label")
rf = RandomForestRegressor(featuresCol="features", labelCol="label")
gbt = GBTRegressor(featuresCol="features", labelCol="label")

# Define a pipeline for each regression algorithm
pipeline_lr = Pipeline(stages=[lr])
pipeline_glr = Pipeline(stages=[glr])
pipeline_dt = Pipeline(stages=[dt])
pipeline_rf = Pipeline(stages=[rf])
pipeline_gbt = Pipeline(stages=[gbt])

# Fit the pipelines
model_lr = pipeline_lr.fit(trainingData)
model_glr = pipeline_glr.fit(trainingData)
model_dt = pipeline_dt.fit(trainingData)
model_rf = pipeline_rf.fit(trainingData)
model_gbt = pipeline_gbt.fit(trainingData)

# Make predictions
predictions_lr = model_lr.transform(testData)
predictions_glr = model_glr.transform(testData)
predictions_dt = model_dt.transform(testData)
predictions_rf = model_rf.transform(testData)
predictions_gbt = model_gbt.transform(testData)

# Evaluate the models
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

rmse_lr = evaluator.evaluate(predictions_lr)
rmse_glr = evaluator.evaluate(predictions_glr)
rmse_dt = evaluator.evaluate(predictions_dt)
rmse_rf = evaluator.evaluate(predictions_rf)
rmse_gbt = evaluator.evaluate(predictions_gbt)

print("Linear Regression RMSE:", rmse_lr)
print("General Linear Regression RMSE:", rmse_glr)
print("Decision Tree Regression RMSE:", rmse_dt)
print("Random Forest Regression RMSE:", rmse_rf)
print("Gradient Boosted Tree Regression RMSE:", rmse_gbt)

25/11/13 14:27:53 WARN Instrumentation: [d7850e50] regParam is zero, which might cause numerical instability and overfitting.
25/11/13 14:27:54 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
25/11/13 14:27:54 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
25/11/13 14:27:54 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
25/11/13 14:27:54 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
25/11/13 14:27:55 WARN Instrumentation: [ba900656] regParam is zero, which might cause numerical instability and overfitting.


Linear Regression RMSE: 0.7180544053056171
General Linear Regression RMSE: 0.7180544053056532
Decision Tree Regression RMSE: 0.6867932923910357
Random Forest Regression RMSE: 0.6506224132865478
Gradient Boosted Tree Regression RMSE: 0.5484938188092565


In [9]:
predictions_lr.show(5,60)

+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+------------------------------------------------------------+------------------+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|label|                                                    features|        prediction|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+------------------------------------------------------------+------------------+
|0.4999|    46.0|1.7142857142857142|0.5714285714285714|      18.0|2.5714285714285716|   37.81|  -122.29|0.675|[0.4999,46.0,1.7142857142857142,0.5714285714285714,18.0,2...|1.0424034204339918|
|  0.75|    52.0| 2.823529411764706|0.9117647058823529|     191.0| 5.617647058823529|    37.8|  -122.28|1.625|[0.75,52.0,2.823529411764706,0.9117647058823529,191.0,5.6...| 1.335191523858633|
|0.8012|    28.0| 5.283950617283951|1.1666666