In [5]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nche').getOrCreate()

In [6]:
df_full = spark.read.csv('insurance_dataset.csv', header=True)
df_infor = spark.read.csv('information.csv', header=True)
df_srw = spark.read.csv('insurance_dataset - srw.csv', header=True)

In [7]:
#df_srw = df_srw.drop('exercise_frequency')

In [10]:
from pyspark.sql.functions import col, isnan, when, count
#Convert the string type varibale into integer or float
df_infor = df_infor.withColumn("age", col("age").cast("int"))
df_infor = df_infor.withColumn("children", col("children").cast("int"))
df_infor = df_infor.withColumn("charges", col("charges").cast("int"))
df_srw = df_srw.withColumn("bmi", col("bmi").cast("int"))
df_srw = df_srw.withColumn("charges", col("charges").cast("int"))

In [11]:
df_srw = df_srw.na.drop('any')
df_infor = df_infor.na.drop('any')

In [12]:
#integrate two data set
df_infor = df_infor.drop('charges')
insurance_df = df_infor.join(df_srw,['ID'], how='inner').drop('ID')

In [13]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def create_new_feature(bmi, smoker, medical_history):
    if bmi < 25 and smoker == 'no' and medical_history == 'None':
        return '1'
    else:
        return '0'

# Register the UDF with PySpark
create_new_feature_udf = udf(create_new_feature, StringType())

# Apply the UDF to create the new column
merged_df = insurance_df.withColumn('Risk_Level', create_new_feature_udf(insurance_df['bmi'], insurance_df['smoker'], insurance_df['medical_history']))

In [14]:
from pyspark.sql import functions as F

value_counts = merged_df.groupBy('Risk_Level').count().orderBy('Risk_Level')
result = value_counts.collect()


                                                                                

In [15]:
#convert the data from characters into number
gender = {'female':1,'male':0}
smoker = {'yes':1, 'no':0}
region = {'southeast':0, 'northwest':1, 'southwest':2, 'northeast':3}
medical = {'Diabetes':1, 'None':0, 'High blood pressure':2, 'Heart disease':3}
occupation = {'Blue collar':0, 'Unemployed':1, 'Student':2, 'White collar':3}
coverage_level = {'Basic':0, 'Standard':1,'Premium':2}

data = merged_df.rdd.map(lambda x: (gender[x.gender], region[x.region], occupation[x.occupation], smoker[x.smoker],\
                                   medical[x.medical_history], medical[x.family_medical_history],\
                                    coverage_level[x.coverage_level], x.age, x.bmi, x.children, x.Risk_Level, x.charges)).toDF(['gender', 'region','occupation', 'smoker', 'medical_history',\
                                                                          'family_medical_history', 'coverage_level','age','bmi', 'children', 'Risk_Level','charges'])

                                                                                

In [16]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='gender', outputCol='gender_I')
data = indexer.fit(data).transform(data)

                                                                                

In [17]:
result_df = data.withColumn("Risk_Level", col("Risk_Level").cast("int"))
#result_df.dtypes

result_df.columns
r_df = result_df

In [18]:
#feature selection
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
cols_name = ['gender_I','region','occupation','smoker','medical_history','family_medical_history','coverage_level','Risk_Level',\
                   'age', 'bmi', 'children']

# Assemble the feature vector
assembler = VectorAssembler(inputCols=cols_name, outputCol="features")
assembled_df = assembler.transform(result_df)

In [19]:
lasso = LinearRegression(
    featuresCol="features",
    labelCol='charges', 
    elasticNetParam=1.0,       # This sets L1 regularization 
)

lasso_model = lasso.fit(assembled_df)

23/10/12 20:43:43 WARN Instrumentation: [6b7b5e4b] regParam is zero, which might cause numerical instability and overfitting.
23/10/12 20:43:45 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/10/12 20:43:45 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/10/12 20:43:52 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [20]:
coefficients = lasso_model.coefficients
threshold = 0.0  # Set your own threshold
selected_feature_columns = [feature for i, feature in enumerate(cols_name) if coefficients[i] > threshold]
selected_df = assembled_df.select(selected_feature_columns)

print(selected_df)

DataFrame[gender_I: double, region: bigint, occupation: bigint, smoker: bigint, medical_history: bigint, family_medical_history: bigint, coverage_level: bigint, age: bigint, bmi: bigint, children: bigint]


In [21]:
#balance the model
#input_features = result_df.drop('charges')
input_features = result_df
majority_class = input_features.filter(col("Risk_Level") == 0)
minority_class = input_features.filter(col("Risk_Level") == 1)

# Resample the majority class to match the minority class
majority_count = majority_class.count()
minority_count = minority_class.count()

# Calculate the ratio for resampling
resampling_ratio = minority_count / majority_count

majority_downsampled = majority_class.sample(withReplacement=False, fraction=resampling_ratio, seed=42)

# Combine the resampled majority class with the minority class
balanced_df = minority_class.union(majority_downsampled)

# Verify the balance
balanced_category_counts = balanced_df.groupBy("Risk_Level").count()

# Show the counts for each category
balanced_category_counts.show()

print("Total instances in balanced dataset:", balanced_df.count())

                                                                                

+----------+-----+
|Risk_Level|count|
+----------+-----+
|         1| 3165|
|         0| 3230|
+----------+-----+





Total instances in balanced dataset: 6395


                                                                                

In [22]:
conditions = [
    (balanced_df['charges'] >= 22856),
    (balanced_df['charges'] < 22856)
]

# Define the corresponding categories
categories = [1, 2]

# Add a new column "charges_category" based on the conditions
balanced_df = balanced_df.withColumn(
    "charges_category",
    when(conditions[0], categories[0])
    .when(conditions[1], categories[1])
    .otherwise(None)
)

# Show the resulting DataFrame
balanced_df.show()

[Stage 34:>                                                         (0 + 1) / 1]

+------+------+----------+------+---------------+----------------------+--------------+---+---+--------+----------+-------+--------+----------------+
|gender|region|occupation|smoker|medical_history|family_medical_history|coverage_level|age|bmi|children|Risk_Level|charges|gender_I|charges_category|
+------+------+----------+------+---------------+----------------------+--------------+---+---+--------+----------+-------+--------+----------------+
|     0|     2|         3|     0|              0|                     0|             2| 37| 23|       1|         1|  13782|     1.0|               2|
|     1|     2|         1|     0|              0|                     0|             2| 21| 22|       0|         1|   8904|     0.0|               2|
|     1|     3|         3|     0|              0|                     3|             2| 21| 22|       2|         1|  16560|     0.0|               2|
|     1|     3|         3|     0|              0|                     3|             1| 58| 19|     

                                                                                

In [23]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [24]:
assembler = VectorAssembler(
  inputCols=['gender',
 'region',
 'occupation',
 'smoker',
 'medical_history',
 'family_medical_history',
 'coverage_level',
 'age',
 'bmi',
 'children',
 'Risk_Level',
 'gender_I'],
    outputCol="features")

output = assembler.transform(balanced_df)
train_data,test_data = output.randomSplit([0.8,0.2])
rfc = RandomForestClassifier(labelCol='charges_category',featuresCol='features')
rfc_model = rfc.fit(train_data)
rfc_predictions = rfc_model.transform(test_data)
# Let's import the evaluator.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error. 
acc_evaluator = MulticlassClassificationEvaluator(labelCol="charges_category", predictionCol="prediction", metricName="accuracy")
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))



A random forest ensemble has an accuracy of: 96.16%


23/10/12 20:47:55 WARN NettyRpcEnv: Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from ip-172-31-26-2.ec2.internal:45019 in 10000 milliseconds
