# Analytics Vidhya Big Data Engineering Training Batch
### Spark Assessment, Shrey Marwaha, 18th June 2022

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, countDistinct
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from time import *

In [2]:
spark = SparkSession.builder \
                    .appName('Adult Income Range Prediction') \
                    .getOrCreate()

In [3]:
#spark.stop()

In [4]:
spark

### 1. Loading the data in Spark Dataframe

In [5]:
adult_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("adult_data.csv")

In [6]:
adult_df.show(5)

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|  x|age|workclass|fnlwgt|   education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|  1| 25|  Private|226802|        11th|              7|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|           0|           0|            40| United-States| <=50K|
|  2| 38|  Private| 89814|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|     Husband|White|  Male|           0|           0|            50| United-States| <=50K|
|  3| 28|Local-gov|336951|  Assoc-acdm|             12|Married-civ-spouse|  Protective-ser

In [7]:
adult_df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



### 2. Converting relevant continuous columns to appropriate format
- Not converting x, age, educational-num, hours-per-week as they are small numbers so int should be fine
- Converting fnlwgt, capital-gain, capital-loss to double 

In [8]:
columns_to_cast = ['fnlwgt', 'capital-gain', 'capital-loss']
cast_expr = [
    col(c).cast("double") if c in columns_to_cast else col(c) for c in adult_df.columns
]
adult_df_casted = adult_df.select(*cast_expr)
adult_df_casted.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [9]:
adult_df_casted.show(5)

+---+---+---------+--------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|  x|age|workclass|  fnlwgt|   education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+--------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|  1| 25|  Private|226802.0|        11th|              7|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|         0.0|         0.0|            40| United-States| <=50K|
|  2| 38|  Private| 89814.0|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|     Husband|White|  Male|         0.0|         0.0|            50| United-States| <=50K|
|  3| 28|Local-gov|336951.0|  Assoc-acdm|             12|Married-civ-spouse|  Pr

### 3. Counting number of rows by education level

In [10]:
adult_df_casted.groupBy("education").count().orderBy('count', ascending=False).show()

+------------+-----+
|   education|count|
+------------+-----+
|     HS-grad|15784|
|Some-college|10878|
|   Bachelors| 8025|
|     Masters| 2657|
|   Assoc-voc| 2061|
|        11th| 1812|
|  Assoc-acdm| 1601|
|        10th| 1389|
|     7th-8th|  955|
| Prof-school|  834|
|         9th|  756|
|        12th|  657|
|   Doctorate|  594|
|     5th-6th|  509|
|     1st-4th|  247|
|   Preschool|   83|
+------------+-----+



### 4. Summary stats of the data

In [11]:
adult_df_casted.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
x,48842,24421.5,14099.615260708357,1,48842
age,48842,38.64358543876172,13.710509934443502,17,90
workclass,48842,,,?,Without-pay
fnlwgt,48842,189664.13459727284,105604.02542315757,12285.0,1490400.0
education,48842,,,10th,Some-college
educational-num,48842,10.078088530363212,2.570972755592252,1,16
marital-status,48842,,,Divorced,Widowed
occupation,48842,,,?,Transport-moving
relationship,48842,,,Husband,Wife


### 5. Crosstab of the data for Education vs Income range 

In [12]:
adult_df_casted.crosstab('education', 'income')

education_income,<=50K,>50K
Some-college,8815,2063
10th,1302,87
1st-4th,239,8
Assoc-voc,1539,522
Preschool,82,1
9th,715,41
HS-grad,13281,2503
5th-6th,482,27
7th-8th,893,62
11th,1720,92


In [13]:
# Checking similar crosstabs with other categorical variables
for c in ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']:
    adult_df_casted.crosstab(c, 'income').show()

+----------------+-----+----+
|workclass_income|<=50K|>50K|
+----------------+-----+----+
|Self-emp-not-inc| 2785|1077|
|       State-gov| 1451| 530|
|       Local-gov| 2209| 927|
|     Without-pay|   19|   2|
|     Federal-gov|  871| 561|
|               ?| 2534| 265|
|    Never-worked|   10|   0|
|         Private|26519|7387|
|    Self-emp-inc|  757| 938|
+----------------+-----+----+

+---------------------+-----+----+
|marital-status_income|<=50K|>50K|
+---------------------+-----+----+
|            Separated| 1431|  99|
|              Widowed| 1390| 128|
|        Never-married|15384| 733|
|             Divorced| 5962| 671|
|    Married-AF-spouse|   23|  14|
|   Married-civ-spouse|12395|9984|
| Married-spouse-ab...|  570|  58|
+---------------------+-----+----+

+-----------------+-----+----+
|occupation_income|<=50K|>50K|
+-----------------+-----+----+
|            Sales| 4029|1475|
|     Craft-repair| 4729|1383|
|    Other-service| 4719| 204|
|     Tech-support| 1026| 420|
|   Pr

## Looking at the crosstabs, there seems to be junk data with "?" character in some of the features. We will need to handle it at a later point

### 6. Number of people aged > 40 yrs

In [14]:
adult_df_casted.where(adult_df_casted['age']>40).count()

20211

### 7. Feature Engineering/Data Wrangling

#### -- Renaming columns

In [15]:
adult_df_casted = adult_df_casted.withColumnRenamed('educational-num', 'years_of_education')

#### -- To capture non-linear relationship of age and income-range, we will add a age-squared column

In [16]:
# Adding a age-squared column to capture non-linear relationship of age with income-range

adult_df_casted = adult_df_casted.withColumn('age-squared', adult_df_casted.age**2) 

In [17]:
adult_df_casted.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- years_of_education: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age-squared: double (nullable = true)



#### -- We also need to check for feature-groups having single observations

In [18]:
# Checking instances of single observation groups for categorical variables
for c in ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']:
    adult_df_casted.groupBy(c).count().sort("count", ascending=True).show()

+----------------+-----+
|       workclass|count|
+----------------+-----+
|    Never-worked|   10|
|     Without-pay|   21|
|     Federal-gov| 1432|
|    Self-emp-inc| 1695|
|       State-gov| 1981|
|               ?| 2799|
|       Local-gov| 3136|
|Self-emp-not-inc| 3862|
|         Private|33906|
+----------------+-----+

+--------------------+-----+
|      marital-status|count|
+--------------------+-----+
|   Married-AF-spouse|   37|
|Married-spouse-ab...|  628|
|             Widowed| 1518|
|           Separated| 1530|
|            Divorced| 6633|
|       Never-married|16117|
|  Married-civ-spouse|22379|
+--------------------+-----+

+-----------------+-----+
|       occupation|count|
+-----------------+-----+
|     Armed-Forces|   15|
|  Priv-house-serv|  242|
|  Protective-serv|  983|
|     Tech-support| 1446|
|  Farming-fishing| 1490|
|Handlers-cleaners| 2072|
| Transport-moving| 2355|
|                ?| 2809|
|Machine-op-inspct| 3022|
|    Other-service| 4923|
|            Sal

### -- As per above tables, in native-country column, the "Holand-Netherlands" group has only 1 observation, we will remove this

In [19]:
# Education = Preschool + Income > 50K --> Only single data point. This can be removed
adult_df_filtered = adult_df_casted.where(adult_df_casted['native-country'] != 'Holand-Netherlands')

In [20]:
# Verifying number of rows
print("Original number of rows: ", adult_df.count())
print("Final number of rows: ", adult_df_filtered.count())

Original number of rows:  48842
Final number of rows:  48841


In [21]:
# Checking for junk values in the dataset
for column in adult_df_filtered.columns:
    count = adult_df_filtered.filter(adult_df_filtered[column]=='?').count()
    print(f"{column} : {count}")

x : 0
age : 0
workclass : 2799
fnlwgt : 0
education : 0
years_of_education : 0
marital-status : 0
occupation : 2809
relationship : 0
race : 0
gender : 0
capital-gain : 0
capital-loss : 0
hours-per-week : 0
native-country : 857
income : 0
age-squared : 0


### Multiple columns have junk character data. We will try two approaches to tackle this:
1. Remove the rows which have this character in the given columns
2. Replace "?" with the mode of the respective column

In [22]:
# 1. Creating a reduced sample dataframe with junk data removed
adult_df_reduced = adult_df_filtered.where((adult_df_filtered['workclass'] != '?') \
                                           & (adult_df_filtered['occupation'] !='?') \
                                           & (adult_df_filtered['native-country']!='?'))
adult_df_reduced.count()

45221

In [23]:
# 2. Creating a cleaned dataframe with junk data replaced with mode of the column

# 2.1 First creating a dict with mode values
bad_columns_dict = {'workclass': '', 'occupation':'', 'native-country':''}

for c in bad_columns_dict.keys():
    mode = adult_df_filtered.where(adult_df_filtered[c]!="?") \
                                    .groupBy(c) \
                                    .count() \
                                    .sort("count", ascending=False) \
                                    .limit(1).collect()[0][0]
    bad_columns_dict[c] = mode

print(bad_columns_dict)

{'workclass': 'Private', 'occupation': 'Prof-specialty', 'native-country': 'United-States'}


In [24]:
# 2.2 Replacing modes for respective columns

for i in bad_columns_dict.keys():
    adult_df_filtered = adult_df_filtered.withColumn(i, when((col(i) == '?'), bad_columns_dict[i]) \
                                                    .otherwise(col(i)))

In [25]:
# Checking for junk values in the cleaned dataset
adult_df_cleaned = adult_df_filtered
for column in adult_df_cleaned.columns:
    count = adult_df_cleaned.filter(adult_df_cleaned[column]=='?').count()
    print(f"{column} : {count}")

x : 0
age : 0
workclass : 0
fnlwgt : 0
education : 0
years_of_education : 0
marital-status : 0
occupation : 0
relationship : 0
race : 0
gender : 0
capital-gain : 0
capital-loss : 0
hours-per-week : 0
native-country : 0
income : 0
age-squared : 0


### Now we have two datasets for two use cases:
1. adult_df_reduced: Rows with junk data have been removed
2. adult_df_cleaned: Rows with junk data have been imputed with mode of respective column

### We will create the model pipeline and then run the pipeline to compare performance in both scenarios

The following steps will be followed for both datasets:
1. String Indexing and One Hot Encoding the categorical independent variables
2. String indexing the dependent variable
3. Combining all input features for assembler
4. Assembling all the stages
5. Creating the model and pipeline
6. Creating the training and testing datasets
7. Running the model and evaluating Performance
8. Tuning the hyperparameters
9. Storing the output in Kafka

In [26]:
# 1. String Indexing and encoding the categorical variables

categorical_variables = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']

stages_list = []

for var in categorical_variables:
    indexer = StringIndexer(inputCol = var, outputCol = var + "_indexed")
    encoder = OneHotEncoder(inputCols = [indexer.getOutputCol()], outputCols = [var + "_encoded"])
    stages_list += [indexer, encoder]
stages_list

[StringIndexer_7298558f21c1,
 OneHotEncoder_5f80fb73de46,
 StringIndexer_8f960a19af07,
 OneHotEncoder_25313e2efb4a,
 StringIndexer_557fedbca5e7,
 OneHotEncoder_4206b640e4b7,
 StringIndexer_e735de562ff4,
 OneHotEncoder_7e17bea1a689,
 StringIndexer_e87e36ef69b6,
 OneHotEncoder_762796f5ba73,
 StringIndexer_5588efd066a0,
 OneHotEncoder_e4c093f098e6,
 StringIndexer_1efabe7983be,
 OneHotEncoder_e04fb7674b49,
 StringIndexer_495be3f1f701,
 OneHotEncoder_f098675f6f9d]

In [27]:
# 2. Index the dependent variable income

label_indexer = StringIndexer(inputCol = 'income', outputCol = "label")
stages_list.append(label_indexer)

In [28]:
# 3. Adding numerical variables and combining with categorical features

numerical_variables  = ['age', 'fnlwgt', 'years_of_education', 'capital-gain', 'capital-loss', 'hours-per-week', 'age-squared']
encoded_cat_variables = [var+'_encoded' for var in categorical_variables]

assembler_input = numerical_variables + encoded_cat_variables
assembler_input

['age',
 'fnlwgt',
 'years_of_education',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'age-squared',
 'workclass_encoded',
 'education_encoded',
 'marital-status_encoded',
 'occupation_encoded',
 'relationship_encoded',
 'race_encoded',
 'gender_encoded',
 'native-country_encoded']

In [29]:
# 4. Assembling all features using vector assembler
assembler = VectorAssembler(inputCols = assembler_input, outputCol = "features")
stages_list.append(assembler)

In [30]:
# 5. Creating a single function to take the input dataframe and create a model and pipeline based on that

def buildModels(data, stages_list):
    # building the model and the pipeline
    print("= > Building the pipeline \n")
    pipeline = Pipeline(stages = stages_list)
    pipelineModel = pipeline.fit(data)
    df = pipelineModel.transform(data)
    
    df_model = df.select(data.columns + ['label', 'features'])
    print("= > Schema of dataset:")
    df_model.printSchema()
    
    # splitting the data into train-test set
    train, test = df_model.randomSplit([0.8, 0.2], seed=1910)
    
    # checking the distribution in train and test dataset over income
    print("= > Training Data distribution:")
    train.groupBy("label").count().show()
    print("= > Testing Data distribution:")
    test.groupBy("label").count().show()
    
    # Running a randomforest classifier
    print("= > Running a random forest classifier on the training dataset")
    rfc = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
    rf_model = rfc.fit(train)
    print("= > Testing the random forest model on the test dataset")
    predictions_rf = rf_model.transform(test)
        
    # Running a logistic regression classifier
    print("= > Running a logistic regression classifier on the training dataset")
    lr = LogisticRegression(featuresCol = 'features', labelCol = 'label')
    lr_model = lr.fit(train)
    print("= > Testing the logistic regression model on the test dataset")
    predictions_lr = lr_model.transform(test)
        
    # Model performance metrics
    # 1. Accuracy
    accuracy_rfc = predictions_rf.filter(predictions_rf.label == predictions_rf.prediction) \
                                        .count() / float(predictions_rf.count())
    print(f"\n----> Accuracy of random forest classifier: {round(accuracy_rfc*100, 3)}%")
    
    accuracy_lr = predictions_lr.filter(predictions_lr.label == predictions_lr.prediction) \
                                        .count() / float(predictions_lr.count())
    print(f"----> Accuracy of logistic regression classifier: {round(accuracy_lr*100, 3)}%")
    
    # 2. Area under the ROC using the Binary Classification Evaluator
    bc_evaluator = BinaryClassificationEvaluator()
    auc_rf = bc_evaluator.evaluate(predictions_rf)
    auc_lr = bc_evaluator.evaluate(predictions_lr)
    print(f"\n----> Area under the ROC of random forest classifier : {round(auc_rf*100, 3)}%")
    print(f"----> Area under the ROC of logistic regression classifier : {round(auc_lr*100, 3)}%")
    
    return (rfc, lr, train, test)

In [31]:
# 6. Testing both datasets (reduced one and imputed one to compare performance)

print("-------------- 1. Building the Models on the reduced dataset with less # of rows ------------------")
print("Dataset picked: adult_df_reduced")
reduced_dataset_rfc, reduced_dataset_lr, reduced_train, reduced_test = buildModels(adult_df_reduced, stages_list)
print("\n----------------------------------------- Completed -----------------------------------------------")

print("\n-------------- 2. Building the Models on the cleaned dataset with junk data imputed with mode values ------------------")
print("Dataset picked: adult_df_reduced")
imputed_dataset_rfc, imputed_dataset_lr, imputed_train, imputed_test = buildModels(adult_df_cleaned, stages_list)
print("\n----------------------------------------- Completed -----------------------------------------------")

-------------- 1. Building the Models on the reduced dataset with less # of rows ------------------
Dataset picked: adult_df_reduced
= > Building the pipeline 

= > Schema of dataset:
root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- years_of_education: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age-squared: double (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)

= > Training Data distr

### As we can see from both accuracy and Area under ROC metrics, logistic regression with imputed dataset performs better than the other combinations (Random forest, reduced dataset).
#### - Accuracy: 85.828%
#### - Area under ROC: 91.117%

### So, we will use the Logistic Regression Classifier with the imputed dataset, for further hyperparameter tuning and finalising the model

In [32]:
imputed_dataset_lr.explainParams().split("\n")

['aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)',
 'elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)',
 'family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)',
 'featuresCol: features column name. (default: features, current: features)',
 'fitIntercept: whether to fit an intercept term. (default: True)',
 'labelCol: label column name. (default: label, current: label)',
 'lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)',
 'lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound

In [33]:
# We will use regularisation parameters for hyper parameter tuning, 
# and 5 fold cross-validation (20% data will be used for testing)

parameter_grid = (ParamGridBuilder()
                  .addGrid(imputed_dataset_lr.regParam, [0, 0.01, 0.5])
                  .addGrid(imputed_dataset_lr.elasticNetParam, [0.0, 0.5, 1.0])
                  .addGrid(imputed_dataset_lr.maxIter, [10, 50, 100])
                  .build())

bc_evaluator = BinaryClassificationEvaluator()

cv = CrossValidator(estimator = imputed_dataset_lr, estimatorParamMaps = parameter_grid, \
                    evaluator = bc_evaluator, numFolds = 5)

start_time = time()
tuned_lr = cv.fit(imputed_train)
end_time = time()
time_taken = end_time - start_time
print("Time to train model: %.3f seconds" % time_taken)

Time to train model: 467.645 seconds


In [34]:
# Testing the performance of tuned model
tuned_predictions = tuned_lr.transform(imputed_test)
accuracy = tuned_predictions.filter(tuned_predictions.label == tuned_predictions.prediction) \
                                        .count() / float(tuned_predictions.count())
print(f"----> Accuracy of tuned logistic regression classifier: {round(accuracy*100, 3)}%")

auc = bc_evaluator.evaluate(tuned_predictions)
print(f"----> Area under the ROC of tuned logistic regression classifier : {round(auc*100, 3)}%")

----> Accuracy of tuned logistic regression classifier: 85.828%
----> Area under the ROC of tuned logistic regression classifier : 91.117%


In [35]:
best_model = tuned_lr.bestModel
best_model.explainParams().split("\n")

['aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)',
 'elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.0)',
 'family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)',
 'featuresCol: features column name. (default: features, current: features)',
 'fitIntercept: whether to fit an intercept term. (default: True)',
 'labelCol: label column name. (default: label, current: label)',
 'lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)',
 'lowerBoundsOnIntercepts: The lower bounds on intercepts if fitti

In [36]:
print("Best Parameter for regParam: ", best_model._java_obj.getRegParam())
print("Best Parameter for Elastic Net Param: ", best_model._java_obj.getElasticNetParam())
print("Best Parameter for Max iter: ", best_model._java_obj.getMaxIter())

Best Parameter for regParam:  0.0
Best Parameter for Elastic Net Param:  0.0
Best Parameter for Max iter:  100


### Now we have finalised our model and we can persist it in HDFS for future use

In [41]:
best_model.write().overwrite().save("AdultIncomePrediction/LogisticRegression.model")