# Task :- Use of  Machine Learning Library (MLlib) :- Spark’s machine learning (ML) library for Regression
### Models :- Linear Regression, Decision Tree Regression,Random forest regression,Gradient-boosted tree regression.
### Dataset :- insurance.csv.

**MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as:**

<ul>
<li> ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering.</li>

<li> Featurization: feature extraction, transformation, dimensionality reduction, and selection.</li>

<li> Pipelines: tools for constructing, evaluating, and tuning ML Pipelines.</li>

<li> Persistence: saving and load algorithms, models, and Pipelines.</li>

<li> Utilities: linear algebra, statistics, data handling, etc.</li>
</ul>    

In [1]:
# For Local Machine sparkcontext creation
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# In cloud the sparkcontext is created as "sc" 
# ex:- IBM watson 
# python :- 3.6
# spark :- 2.3.3

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200303101851-0000
KERNEL_ID = 2ce083b6-bf23-4ab9-b3a4-d98a882a55d0


In [2]:
sc.version

'2.3.3'

In [3]:
# The following code contains the credentials for a file in your IBM Cloud Object Storage.
# IBM WATSON PYTHON+SPARK ENVIRONMENT
import ibmos2spark
# @hidden_cell
credentials = {
    'endpoint': 'XXXX',
    'service_id': 'XXXX',
    'iam_service_endpoint': 'XXXX',
    'api_key': 'XXXX'
}

configuration_name = 'os_0a7faf2d576d4d0e985305b42aae4ce7_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')


In [4]:

# @hidden_cell
# The following code contains the credentials for a file in your IBM Cloud Object Storage.
# You might want to remove those credentials before you share your notebook.
credentials_1 = {
    'IAM_SERVICE_ID': 'XXXX',
    'IBM_API_KEY_ID': 'XXXX',
    'ENDPOINT': 'XXXX',
    'IBM_AUTH_ENDPOINT': 'XXXX',
    'BUCKET': 'XXXX',
    'FILE': 'insurance.csv'
}
data_link = cos.url('insurance.csv', 'sparktry-donotdelete-pr-adbnjs9sf1yuav')

In [5]:
df_insurence = spark.read.csv(data_link, header=True, inferSchema=True)

In [6]:
df_insurence.columns

['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges']

In [7]:
df_insurence.show(10)

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
+---+------+------+--------+------+---------+-----------+
only showing top 10 rows



In [8]:
df_insurence.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [9]:
df_insurence.describe().show()

+-------+------------------+------+------------------+-----------------+------+---------+------------------+
|summary|               age|   sex|               bmi|         children|smoker|   region|           charges|
+-------+------------------+------+------------------+-----------------+------+---------+------------------+
|  count|              1338|  1338|              1338|             1338|  1338|     1338|              1338|
|   mean| 39.20702541106129|  null|30.663396860986538|  1.0949177877429|  null|     null|13270.422265141257|
| stddev|14.049960379216147|  null| 6.098186911679012|1.205492739781914|  null|     null|12110.011236693992|
|    min|                18|female|             15.96|                0|    no|northeast|         1121.8739|
|    max|                64|  male|             53.13|                5|   yes|southwest|       63770.42801|
+-------+------------------+------+------------------+-----------------+------+---------+------------------+



In [10]:
# for sex columns 
from pyspark.ml.feature import StringIndexer

In [11]:
indexer = StringIndexer(inputCol="smoker", outputCol="smokerIndex")
df_insurence = indexer.fit(df_insurence).transform(df_insurence)

In [12]:
indexer = StringIndexer(inputCol="sex", outputCol="sexIndex")
df_insurence = indexer.fit(df_insurence).transform(df_insurence)

In [13]:
df_insurence.show()

+---+------+------+--------+------+---------+-----------+-----------+--------+
|age|   sex|   bmi|children|smoker|   region|    charges|smokerIndex|sexIndex|
+---+------+------+--------+------+---------+-----------+-----------+--------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|        1.0|     1.0|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|        0.0|     0.0|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|        0.0|     0.0|
| 33|  male|22.705|       0|    no|northwest|21984.47061|        0.0|     0.0|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|        0.0|     0.0|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|        0.0|     1.0|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|        0.0|     1.0|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|        0.0|     1.0|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|        0.0|     0.0|
| 60|female| 25.84|       0|    no|northwest|28923.1

In [14]:
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(inputCols=["children"],outputCols=["children_enc"])
model = encoder.fit(df_insurence)
encoded = model.transform(df_insurence)
encoded.show()

+---+------+------+--------+------+---------+-----------+-----------+--------+-------------+
|age|   sex|   bmi|children|smoker|   region|    charges|smokerIndex|sexIndex| children_enc|
+---+------+------+--------+------+---------+-----------+-----------+--------+-------------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|        1.0|     1.0|(5,[0],[1.0])|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|        0.0|     0.0|(5,[1],[1.0])|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|        0.0|     0.0|(5,[3],[1.0])|
| 33|  male|22.705|       0|    no|northwest|21984.47061|        0.0|     0.0|(5,[0],[1.0])|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|        0.0|     0.0|(5,[0],[1.0])|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|        0.0|     1.0|(5,[0],[1.0])|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|        0.0|     1.0|(5,[1],[1.0])|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|        0.0| 

In [15]:
df_insurence.show()

+---+------+------+--------+------+---------+-----------+-----------+--------+
|age|   sex|   bmi|children|smoker|   region|    charges|smokerIndex|sexIndex|
+---+------+------+--------+------+---------+-----------+-----------+--------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|        1.0|     1.0|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|        0.0|     0.0|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|        0.0|     0.0|
| 33|  male|22.705|       0|    no|northwest|21984.47061|        0.0|     0.0|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|        0.0|     0.0|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|        0.0|     1.0|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|        0.0|     1.0|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|        0.0|     1.0|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|        0.0|     0.0|
| 60|female| 25.84|       0|    no|northwest|28923.1

In [16]:
df = df_insurence.drop("sex", "smoker",'region') 

In [17]:
df.show(10)

+---+------+--------+-----------+-----------+--------+
|age|   bmi|children|    charges|smokerIndex|sexIndex|
+---+------+--------+-----------+-----------+--------+
| 19|  27.9|       0|  16884.924|        1.0|     1.0|
| 18| 33.77|       1|  1725.5523|        0.0|     0.0|
| 28|  33.0|       3|   4449.462|        0.0|     0.0|
| 33|22.705|       0|21984.47061|        0.0|     0.0|
| 32| 28.88|       0|  3866.8552|        0.0|     0.0|
| 31| 25.74|       0|  3756.6216|        0.0|     1.0|
| 46| 33.44|       1|  8240.5896|        0.0|     1.0|
| 37| 27.74|       3|  7281.5056|        0.0|     1.0|
| 37| 29.83|       2|  6406.4107|        0.0|     0.0|
| 60| 25.84|       0|28923.13692|        0.0|     1.0|
+---+------+--------+-----------+-----------+--------+
only showing top 10 rows



### Creating the Feature vector

In [40]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['age','bmi','smokerIndex','sexIndex'], outputCol = 'features')

In [41]:
vec_df = vectorAssembler.transform(df)

In [42]:
vec_df = vec_df.select(['features', 'charges'])

In [43]:
vec_df.show(10)

+--------------------+-----------+
|            features|    charges|
+--------------------+-----------+
| [19.0,27.9,1.0,1.0]|  16884.924|
|[18.0,33.77,0.0,0.0]|  1725.5523|
| [28.0,33.0,0.0,0.0]|   4449.462|
|[33.0,22.705,0.0,...|21984.47061|
|[32.0,28.88,0.0,0.0]|  3866.8552|
|[31.0,25.74,0.0,1.0]|  3756.6216|
|[46.0,33.44,0.0,1.0]|  8240.5896|
|[37.0,27.74,0.0,1.0]|  7281.5056|
|[37.0,29.83,0.0,0.0]|  6406.4107|
|[60.0,25.84,0.0,1.0]|28923.13692|
+--------------------+-----------+
only showing top 10 rows



## Train test split

In [44]:
splits = vec_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [45]:
train_df.show(5)

+--------------------+-----------+
|            features|    charges|
+--------------------+-----------+
|[18.0,15.96,0.0,0.0]|  1694.7964|
|[18.0,17.29,1.0,0.0]| 12829.4551|
|[18.0,20.79,0.0,1.0]|  1607.5101|
|[18.0,21.47,0.0,0.0]|  1702.4553|
|[18.0,21.565,1.0,...|13747.87235|
+--------------------+-----------+
only showing top 5 rows



In [46]:
test_df.show(5)

+--------------------+----------+
|            features|   charges|
+--------------------+----------+
|[18.0,21.66,1.0,1.0]|14283.4594|
|[18.0,22.99,0.0,0.0]| 1704.5681|
|[18.0,23.32,0.0,0.0]| 1711.0268|
|[18.0,25.08,0.0,1.0]| 2196.4732|
|[18.0,26.315,0.0,...|2198.18985|
+--------------------+----------+
only showing top 5 rows



# Linear Regression Model

In [47]:
from pyspark.ml.regression import LinearRegression
# creating object
lr_model = LinearRegression(featuresCol = 'features', labelCol='charges',maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [48]:
# fitting of lr_model
lr_fit_model = lr_model.fit(train_df)

In [49]:
# Print the coefficients and intercept for linear regression

print("Coefficients: %s" % str(lr_fit_model.coefficients))
print("Intercept: %s" % str(lr_fit_model.intercept))

Coefficients: [271.6507931145969,322.39065982243727,24047.291135294545,269.953461340607]
Intercept: -12271.26211988909


In [50]:
# Summarize the model over the training set and print out some metrics
trainingSummary = lr_fit_model.summary

In [51]:
# summerizing the model
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show(5)

numIterations: 10
objectiveHistory: [0.4999999999999998, 0.413297188039175, 0.19470668817238454, 0.14013220654793987, 0.12200415453921501, 0.12194420502663178, 0.12194366377518097, 0.12194365977107377, 0.12194365973062525, 0.12194365973048024]
+------------------+
|         residuals|
+------------------+
| 3930.989313060246|
|-9410.422699798139|
|2016.6026647772692|
|2162.2756774386185|
| -9870.22552053906|
+------------------+
only showing top 5 rows



In [52]:
# model  summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 6041.171187
r2: 0.756166


In [53]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_predictions = lr_fit_model.transform(test_df)
lr_predictions.select("prediction","charges","features").show(5)

+------------------+----------+--------------------+
|        prediction|   charges|            features|
+------------------+----------+--------------------+
|  23918.6784445628|14283.4594|[18.0,21.66,1.0,1.0]|
|30.213425491487214| 1704.5681|[18.0,22.99,0.0,0.0]|
|136.60234323289296| 1711.0268|[18.0,23.32,0.0,0.0]|
| 973.9633658609873| 2196.4732|[18.0,25.08,0.0,1.0]|
| 1372.115830741699|2198.18985|[18.0,26.315,0.0,...|
+------------------+----------+--------------------+
only showing top 5 rows



In [54]:
evaluator = RegressionEvaluator(labelCol="charges", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(lr_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 6192.31


# Decision Tree Regression

In [55]:
# object creation 
from pyspark.ml.regression import DecisionTreeRegressor
dt_model = DecisionTreeRegressor(featuresCol ='features', labelCol = 'charges')


In [56]:
# fitting the decision tree model
dt_model = dt_model.fit(train_df)

In [57]:
# Predicting the value
dt_predictions = dt_model.transform(test_df)
dt_predictions.select("prediction","charges","features").show(5)

+------------------+----------+--------------------+
|        prediction|   charges|            features|
+------------------+----------+--------------------+
|13473.944483333333|14283.4594|[18.0,21.66,1.0,1.0]|
| 2438.034821463415| 1704.5681|[18.0,22.99,0.0,0.0]|
| 2438.034821463415| 1711.0268|[18.0,23.32,0.0,0.0]|
| 2438.034821463415| 2196.4732|[18.0,25.08,0.0,1.0]|
| 2438.034821463415|2198.18985|[18.0,26.315,0.0,...|
+------------------+----------+--------------------+
only showing top 5 rows



In [58]:
evaluator = RegressionEvaluator(labelCol="charges", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 4779.97


# Random Forest Regression

In [59]:
# Object creation
from pyspark.ml.regression import RandomForestRegressor
rf_model = RandomForestRegressor(featuresCol="features",labelCol = 'charges',maxDepth=4)

In [61]:
# Fitting the model
rf_model_fit = rf_model.fit(train_df)

In [63]:
# predicting the value
rf_predictions = rf_model_fit.transform(test_df)
rf_predictions.select('prediction', 'charges', 'features').show(5)

+------------------+----------+--------------------+
|        prediction|   charges|            features|
+------------------+----------+--------------------+
|16943.409422856552|14283.4594|[18.0,21.66,1.0,1.0]|
|3897.5943326143934| 1704.5681|[18.0,22.99,0.0,0.0]|
|3897.5943326143934| 1711.0268|[18.0,23.32,0.0,0.0]|
| 4208.915798432345| 2196.4732|[18.0,25.08,0.0,1.0]|
| 4208.915798432345|2198.18985|[18.0,26.315,0.0,...|
+------------------+----------+--------------------+
only showing top 5 rows



In [64]:
# evaluating the model
rf_evaluator = RegressionEvaluator(labelCol="charges", predictionCol="prediction", metricName="rmse")
rmse = rf_evaluator.evaluate(rf_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rf_evaluator1 = RegressionEvaluator(labelCol="charges", predictionCol="prediction", metricName="r2")
r2 = rf_evaluator1.evaluate(rf_predictions)
print("R2 on test data = %g" % r2)

Root Mean Squared Error (RMSE) on test data = 4728.05
R2 on test data = 0.83937


# Gradient-boosted Tree Regression

In [65]:
# object creation
from pyspark.ml.regression import GBTRegressor
gbt_model = GBTRegressor(featuresCol = 'features', labelCol = 'charges',maxIter=10)

In [66]:
# fitting the model
gbt_model_fit = gbt_model.fit(train_df)

In [68]:
# predicting the value
gbt_predictions = gbt_model_fit.transform(test_df)
gbt_predictions.select('prediction', 'charges', 'features').show(5)

+------------------+----------+--------------------+
|        prediction|   charges|            features|
+------------------+----------+--------------------+
| 13767.94453120202|14283.4594|[18.0,21.66,1.0,1.0]|
|  2423.42353356794| 1704.5681|[18.0,22.99,0.0,0.0]|
|  2423.42353356794| 1711.0268|[18.0,23.32,0.0,0.0]|
|2574.6189193968994| 2196.4732|[18.0,25.08,0.0,1.0]|
|  3070.10056951776|2198.18985|[18.0,26.315,0.0,...|
+------------------+----------+--------------------+
only showing top 5 rows



In [69]:
# evaluating 
gbt_evaluator = RegressionEvaluator(labelCol="charges", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbt_evaluator1 = RegressionEvaluator(labelCol="charges", predictionCol="prediction", metricName="r2")
r2 = gbt_evaluator1.evaluate(gbt_predictions)
print("R2 on test data = %g" % r2)

Root Mean Squared Error (RMSE) on test data = 4929.58
R2 on test data = 0.825385


# <center> ** Thanks! ** </center>