In [2]:
#set environment
import os
import sys
 
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [3]:
#import Sparksession driver
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Classification of Medical Cost Prediction Dataset") \
    .getOrCreate()

In [4]:
df = spark.read.csv('linear/regressionspark.csv',inferSchema=True,header = True)
df.show()

+----+---+---+------+--------+------+----------------+----------------+----------------+----------------+-----------+
| _c0|age|sex|   bmi|children|smoker|region_southwest|region_southeast|region_northwest|region_northeast|    charges|
+----+---+---+------+--------+------+----------------+----------------+----------------+----------------+-----------+
| 629| 44|  0| 38.95|       0|     1|             0.0|             0.0|             1.0|             0.0| 42983.4585|
|1087| 57|  1| 31.54|       0|     0|             0.0|             0.0|             1.0|             0.0| 11353.2276|
| 283| 55|  0|32.395|       1|     0|             0.0|             0.0|             0.0|             1.0|11879.10405|
| 790| 39|  0|  41.8|       0|     0|             0.0|             1.0|             0.0|             0.0|   5662.225|
| 594| 41|  1| 40.26|       0|     0|             0.0|             1.0|             0.0|             0.0|  5709.1644|
| 579| 25|  0|23.465|       0|     0|             0.0|  

In [5]:
#Check for missing values
for col in df.columns:
    print("no. of cells in column", col, "with null values:", df.filter(df[col].isNull()).count())

no. of cells in column _c0 with null values: 0
no. of cells in column age with null values: 0
no. of cells in column sex with null values: 0
no. of cells in column bmi with null values: 0
no. of cells in column children with null values: 0
no. of cells in column smoker with null values: 0
no. of cells in column region_southwest with null values: 0
no. of cells in column region_southeast with null values: 0
no. of cells in column region_northwest with null values: 0
no. of cells in column region_northeast with null values: 0
no. of cells in column charges with null values: 0


In [7]:
#all the independent variables need to be packed into one column of vector type
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['age','sex','bmi','children','smoker','region_southwest','region_southeast','region_northwest','region_northeast'], 
                            outputCol="features")
feature_vec=assembler.transform(df).select('features','charges')
feature_vec.show(5)

+--------------------+-----------+
|            features|    charges|
+--------------------+-----------+
|(9,[0,2,4,7],[44....| 42983.4585|
|(9,[0,1,2,7],[57....| 11353.2276|
|(9,[0,2,3,8],[55....|11879.10405|
|(9,[0,2,6],[39.0,...|   5662.225|
|(9,[0,1,2,6],[41....|  5709.1644|
+--------------------+-----------+
only showing top 5 rows



In [8]:
#Check for missing values
for col in feature_vec.columns:
    print("no. of cells in column", col, "with null values:", feature_vec.filter(feature_vec[col].isNull()).count())

no. of cells in column features with null values: 0
no. of cells in column charges with null values: 0


In [9]:
# Split the data into train and test sets
train_data, test_data = feature_vec.randomSplit([.75,.25],seed=0)

In [10]:
splits = feature_vec.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [12]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='charges', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [253.05498418182162,-155.79514553237914,359.5991811521862,483.3759688907996,23287.34168382452,68.4021813272167,-357.13817518769883,822.0804352816368,1150.334172762263]
Intercept: -13337.467377907313


In [18]:
trainingSummary = lr_model.summary
print(trainingSummary)
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2 for Training: %f" % trainingSummary.r2)

<pyspark.ml.regression.LinearRegressionTrainingSummary object at 0x7f1317587978>
RMSE: 6046.109848
r2 for Training: 0.746519


In [19]:
lr_predictions = lr_model.transform(test_df)
lr_predictions

DataFrame[features: vector, charges: double, prediction: double]

In [16]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","charges","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="charges",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-------------------+--------+--------------------+
|         prediction| charges|            features|
+-------------------+--------+--------------------+
|-1496.7518558445754|1241.565|(9,[0,1,2,5],[19....|
|-1316.9522652684846| 1242.26|(9,[0,1,2,5],[19....|
|  2314.999464368595|1256.299|(9,[0,1,2,5],[19....|
|  4112.995370129527|1263.249|(9,[0,1,2,5],[19....|
|  6346.516965398954|1682.597|(9,[0,1,2,5],[22....|
+-------------------+--------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.757428


In [17]:
# Get the RMSE
print("RMSE: {0}".format(lr_model.summary.rootMeanSquaredError))
# Get the R2
print("R2: {0}".format(lr_model.summary.r2))

RMSE: 6046.109847952787
R2: 0.7465186054173145
