In [1]:
import findspark

In [2]:
findspark.init('/home/kavya/spark-3.0.0-preview2-bin-hadoop2.7')

In [27]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F
from pyspark.sql.functions import *

In [4]:
%%time
spark = SparkSession.builder.appName('lrex').getOrCreate()

CPU times: user 37.2 ms, sys: 10.5 ms, total: 47.6 ms
Wall time: 8 s


#### Read data

In [5]:
%%time
data = spark.read.csv('insurance.csv',inferSchema=True,header=True)

CPU times: user 7.2 ms, sys: 722 µs, total: 7.92 ms
Wall time: 9.11 s


In [6]:
data.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [8]:
data.show(2)

+---+------+-----+--------+------+---------+---------+
|age|   sex|  bmi|children|smoker|   region|  charges|
+---+------+-----+--------+------+---------+---------+
| 19|female| 27.9|       0|   yes|southwest|16884.924|
| 18|  male|33.77|       1|    no|southeast|1725.5523|
+---+------+-----+--------+------+---------+---------+
only showing top 2 rows



#### Converting data in acceptable format for ML 

In [7]:
for item in data.head(1)[0]:
    print(item)

19
female
27.9
0
yes
southwest
16884.924


In [57]:
df = data.withColumn('gender', F.when(data['sex']=="male",0).otherwise(1))
df = df.drop('sex')

In [58]:
df = df.withColumn('smoke', F.when(data['smoker']=="no",0).otherwise(1))
df = df.drop('smoker')

In [59]:
from pyspark.ml.feature import StringIndexer

In [60]:
index = StringIndexer(inputCol='region',outputCol='location')

In [61]:
df = index.fit(df).transform(df)
df = df.drop('region')

In [87]:
df.show(5)

+---+------+--------+-----------+------+-----+--------+
|age|   bmi|children|    charges|gender|smoke|location|
+---+------+--------+-----------+------+-----+--------+
| 19|  27.9|       0|  16884.924|     1|    1|     2.0|
| 18| 33.77|       1|  1725.5523|     0|    0|     0.0|
| 28|  33.0|       3|   4449.462|     0|    0|     0.0|
| 33|22.705|       0|21984.47061|     0|    0|     1.0|
| 32| 28.88|       0|  3866.8552|     0|    0|     1.0|
+---+------+--------+-----------+------+-----+--------+
only showing top 5 rows



In [63]:
df.columns

['age', 'bmi', 'children', 'charges', 'gender', 'smoke', 'location']

##### numerical data as features

In [64]:
assembler = VectorAssembler(inputCols=['age', 'bmi', 'children', 'gender', 'smoke', 'location']
                            ,outputCol='features')

In [66]:
output = assembler.transform(df)

In [67]:
final_data = output.select('features','charges')

In [68]:
final_data.show(2)

+--------------------+---------+
|            features|  charges|
+--------------------+---------+
|[19.0,27.9,0.0,1....|16884.924|
|[18.0,33.77,1.0,0...|1725.5523|
+--------------------+---------+
only showing top 2 rows



##### train test data split

In [96]:
%%time
train_data,test_data = final_data.randomSplit([0.8,0.2]) # 70%, 30%

CPU times: user 0 ns, sys: 4.54 ms, total: 4.54 ms
Wall time: 27.9 ms


In [97]:
final_data.columns

['features', 'charges']

##### model training and testing

In [98]:
lr = LinearRegression(featuresCol='features', labelCol='charges',
                      elasticNetParam=0.8,regParam=0.3)

In [90]:
#from pyspark.ml.regression import GeneralizedLinearRegression

In [99]:
#lr = GeneralizedLinearRegression(featuresCol='features', labelCol='charges',
#                                family='gaussian')

In [100]:
%%time
lrModel =lr.fit(train_data)

CPU times: user 19.4 ms, sys: 223 µs, total: 19.6 ms
Wall time: 2.83 s


In [101]:
lrModel.coefficients

DenseVector([257.3287, 323.2647, 523.9775, -11.3125, 23480.06, 219.8928])

In [102]:
lrModel.intercept

-12398.552722409482

In [103]:
training_summary = lrModel.summary

In [104]:
test_results = lrModel.evaluate(test_data)

In [105]:
test_results.rootMeanSquaredError

5944.78960846061

In [106]:
test_results.r2

0.7796709044788035

In [107]:
final_data.describe().show()

+-------+------------------+
|summary|           charges|
+-------+------------------+
|  count|              1338|
|   mean|13270.422265141257|
| stddev|12110.011236693992|
|    min|         1121.8739|
|    max|       63770.42801|
+-------+------------------+



In [108]:
# predict 

In [109]:
unlabeled_data = test_data.select('features')

In [110]:
predictions = lrModel.transform(unlabeled_data)

In [111]:
test_data.show(2)

+--------------------+---------+
|            features|  charges|
+--------------------+---------+
|(6,[0,1],[18.0,41...|1146.7966|
|(6,[0,1],[22.0,26...|1664.9996|
+--------------------+---------+
only showing top 2 rows



In [112]:
predictions.show(2)

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|(6,[0,1],[18.0,41...|5532.4733273117035|
|(6,[0,1],[22.0,26...|1939.1032236234696|
+--------------------+------------------+
only showing top 2 rows

