### Objective 

#### Starting pyspark 

In [1]:
# Import packages
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf

# Setup pyspark sc
sc =SparkContext()
sqlContext = SQLContext(sc)

### Data

Command used to copy input CSV file into Hadoop File system is as below:

### Loading the CSV data

In [2]:
# data load
df = sqlContext.read.format('com.databricks.spark.csv')\
                    .option('header', 'true')\
                    .option('inferschema', 'true')\
                    .load('Attachment_1635667446.csv')

In [3]:
# Sample data
df.show()

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

### Data pre-processing and transformation 

In [4]:
from collections import defaultdict

data_types = defaultdict(list)
for entry in df.schema.fields:
    data_types[str(entry.dataType)].append(entry.name)

strings = data_types["StringType"]

# fill missing values with column averages
missing_data_fill = {}
for var in strings:
    missing_data_fill[var] = "missing"
df = df.fillna(missing_data_fill)

numericals = data_types["DoubleType"] + data_types["IntegerType"] \
                                      + data_types["LongType"]

mean_dict = { col: 'mean' for col in numericals}
col_avgs = df.agg( mean_dict ).collect()[0].asDict()
col_avgs = { k[4:-1]: v for k,v in col_avgs.items() }
df = df.fillna(col_avgs)

In [5]:
df.show()

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

In [6]:
# schema of the data
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = false)
 |-- bmi: double (nullable = false)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = false)
 |-- region: string (nullable = false)
 |-- charges: double (nullable = false)



In [7]:
# variables list in the data
variable_list_emblem = df.columns
variable_list_emblem

['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges']

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# One-hot encoding
strings = [var for var in variable_list_emblem if var in data_types["StringType"]]
stage_string = [StringIndexer(inputCol= c, outputCol= c+"_string_encoded") for c in strings]
stage_one_hot = [OneHotEncoder(inputCol= c+"_string_encoded", outputCol= c+ "_one_hot") for c in strings]

ppl = Pipeline(stages= stage_string + stage_one_hot)
df = ppl.fit(df).transform(df)

from pyspark.ml.feature import Normalizer, VectorAssembler, StandardScaler

numericals = [var for var in variable_list_emblem if var not in data_types["StringType"]]
numericals_out = [var+ "_normalized" for var in numericals]

# Vector assembling numerical features
vs = VectorAssembler(inputCols= numericals, outputCol= "numericals")
df = vs.transform(df)

scaler = StandardScaler(inputCol = "numericals", outputCol = "numericals_after_scale")
normalizer = Normalizer(inputCol = "numericals_after_scale", outputCol= "normalized_numericals", p=1.0)

ppl2 = Pipeline(stages= [scaler, normalizer])
df = ppl2.fit(df).transform(df)
df.show(5)

+---+------+------+--------+------+---------+-----------+------------------+---------------------+---------------------+-------------+--------------+--------------+--------------------+----------------------+---------------------+
|age|   sex|   bmi|children|smoker|   region|    charges|sex_string_encoded|smoker_string_encoded|region_string_encoded|  sex_one_hot|smoker_one_hot|region_one_hot|          numericals|numericals_after_scale|normalized_numericals|
+---+------+------+--------+------+---------+-----------+------------------+---------------------+---------------------+-------------+--------------+--------------+--------------------+----------------------+---------------------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|               1.0|                  1.0|                  2.0|    (1,[],[])|     (1,[],[])| (3,[2],[1.0])|[19.0,27.9,0.0,16...|  [1.35231698077286...| [0.18469880297182...|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|               0.0|

#### Splitting the data into training and testing sets 

In [9]:
categoricals = [var for var in df.columns if var.endswith("_one_hot")]
num = ["numericals"]
vector_assembler = VectorAssembler(inputCols= categoricals + num, outputCol= "features")
df = vector_assembler.transform(df)

# 70% for training and 30% for testing 
training_set, test_set = df.randomSplit([0.7, 0.3], seed = 2021)

### Model Build and Validation 

In [10]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import GeneralizedLinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

#model definiton
charges_medical =  GeneralizedLinearRegression(family="poisson",
                                           link="log",
                                           maxIter=10,
                                           fitIntercept = True,
                                           labelCol = "charges",
                                           regParam=0.3)
# Hyper-paramter tuning the model
para_grid = ParamGridBuilder()\
           .addGrid(charges_medical.regParam, [0.1, 0.3, 0.5, 0.7, 0.9])\
           .build()

# Model training evaluation
evaluator = RegressionEvaluator(labelCol="charges",
                                predictionCol="prediction",
                                metricName="rmse")

cross_val = CrossValidator(estimator = charges_medical,
                           estimatorParamMaps= para_grid,
                           evaluator = evaluator)
# Fitting the model on training set
model_charges_medical = cross_val.fit(training_set)

In [11]:
# Prediction on the test set
df_predictions = model_charges_medical.transform(test_set)

#### Model performance validation on test data 

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(
    labelCol="charges", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(df_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

from sklearn.metrics import r2_score
prediction_to_pandas = df_predictions.select(["prediction","charges"]).toPandas()
r2 = r2_score(prediction_to_pandas.prediction, prediction_to_pandas.charges)
print("R-square on test data = %g" % r2)

Root Mean Squared Error (RMSE) on test data = 6525.15
R-square on test data = 0.821807


### Conclusion 