<a href="https://colab.research.google.com/github/snehakondapalli/MachineLearning_IncomePredictions_Pyspark/blob/main/Income_prediction_using_classifiers_using_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install ucimlrepo
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=c4fdd2bd9c11f97680ec5456337259dd9d0f558d20f8e82537db8bd62149a825
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1



## Data collection

### Load the “adult dataset” from https://archive.ics.uci.edu/ml/datasets/adult.

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Normalizer
from pyspark.sql import Window
from ucimlrepo import fetch_ucirepo

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

adult = fetch_ucirepo(id=2)

In [21]:
from pyspark.sql.functions import monotonically_increasing_id

X_features = adult.data.features
y_target = adult.data.targets

X_features = spark.createDataFrame(X_features)
Y = spark.createDataFrame(y_target)

X_features = X_features.withColumn("id", monotonically_increasing_id())
Y = Y.withColumn("id", monotonically_increasing_id())


X = X_features.join(Y, X_features.id == Y.id, 'inner').drop(Y.id)

X.show()

+---+----------------+------+------------+-------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+----+------+
|age|       workclass|fnlwgt|   education|education-num|    marital-status|       occupation| relationship|              race|   sex|capital-gain|capital-loss|hours-per-week|native-country|  id|income|
+---+----------------+------+------------+-------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+----+------+
| 19|         Private|168294|     HS-grad|            9|     Never-married|     Craft-repair|    Own-child|             White|  Male|           0|           0|            40| United-States|  26| <=50K|
| 49|         Private|193366|     HS-grad|            9|Married-civ-spouse|     Craft-repair|      Husband|             White|  Male|           0|           0|            40| United-States|  2


## Data cleaning .

### Handle the missing values.

In [22]:
# Check for any nan or null values in the Dataframe
X.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in X.columns]).show()

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+---+------+
|age|workclass|fnlwgt|education|education-num|marital-status|occupation|relationship|race|sex|capital-gain|capital-loss|hours-per-week|native-country| id|income|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+---+------+
|  0|      963|     0|        0|            0|             0|       966|           0|   0|  0|           0|           0|             0|           274|  0|     0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+---+------+



In [18]:
# Replace "?" with unknown
X = X.select([when(col(c)=="?","Unknown").otherwise(col(c)).alias(c) for c in X.columns])

# Replace null values with unknown
X = X.select([when(col(c).isNull(),"Unknown").otherwise(col(c)).alias(c) for c in X.columns])

# Convert <=50K. and >50K. to just <=50K and >50K
X = X.select([when(col(c) == '<=50K.',"<=50K").otherwise(col(c)).alias(c) for c in X.columns])
X = X.select([when(col(c) == '>50K.',">50K").otherwise(col(c)).alias(c) for c in X.columns])

# Check if there are any null values remaining
X.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in X.columns]).show()


+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+---+------+
|age|workclass|fnlwgt|education|education-num|marital-status|occupation|relationship|race|sex|capital-gain|capital-loss|hours-per-week|native-country| id|income|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+---+------+
|  0|      963|     0|        0|            0|             0|       966|           0|   0|  0|           0|           0|             0|           274|  0|     0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+---+------+




## Feature engineering.

### Distill the features (all columns ex-cept for income) and labels (income). Transform the features into vectors. Use one-hot encoder to process categorical features. Split the dataset into a training and testing set with a ratio of 80% v.s. 20%.

In [13]:
indexed_X = 0;
i = 0
for column_name in X.columns:
    # String Indexer Initialization
    indexer = StringIndexer(inputCol=column_name, outputCol=f"{column_name}_indexed")
    indexerModel = indexer.fit(X if i == 0 else indexed_X)

    # Transform the DataFrame using the fitted StringIndexer model
    indexed_X = indexerModel.transform(X if i == 0 else indexed_X)
    i = i + 1

indexed_X = indexed_X.drop(*X.columns)

input_columns = [a for a in indexed_X.columns if a!='income_indexed']
encoder = OneHotEncoder(inputCols=input_columns, outputCols=[a for a in X.columns if a!='income'])
model = encoder.fit(indexed_X)
encoded = model.transform(indexed_X)
encoded = encoded.drop(*input_columns)

assembler = VectorAssembler(inputCols=X_features.columns, outputCol='features_assembled')

features_vectorized = assembler.transform(encoded)

normalizer = Normalizer(inputCol="features_assembled", outputCol="features", p=1.0)
l1NormData = normalizer.transform(features_vectorized)
l1NormData = l1NormData.withColumnRenamed('income_indexed', 'label')

train, test = l1NormData.randomSplit(weights=[0.8,0.2], seed=200)

# train.display()

train.select('label').distinct().show()

+-----+
|label|
+-----+
|  0.0|
|  1.0|
+-----+




## Training.

### Build a logistic regression and a gradient-boostedtree model to fit the dataset.

In [14]:
# Logistic regression Model
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, GBTClassifier

lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=2, regParam=0.001)

lr_model = lr.fit(train)

In [20]:
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=2)

pipeline = Pipeline(stages=[gbt])

gbt_model = pipeline.fit(train)
pipeline

Pipeline_e6212bbfadd9


## Tuning and Evaluation.

### For each model, use grid parameter search and cross validation over 5 folds to find the parameters that yields the highest areaUnderROC on the training set.

In [16]:
# Importing the evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()

cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator , parallelism=2, numFolds=5)

train_data = l1NormData.drop(*X_features.columns)
train_data = train_data.drop('features_assembled')

cvModel = cv.fit(train_data)

trainingSummary = cvModel.bestModel.summary

print("Total Iterations -> ", trainingSummary.totalIterations)
print("Objective History -> ", trainingSummary.objectiveHistory)

evaluator.evaluate(cvModel.transform(train_data))

Total Iterations ->  1
Objective History ->  [0.550250619060374, 0.18879399589650184]


0.9938453584160494


## Prediction.

### For each model, make predictions on the testing set and display the areaUnderROC.

In [17]:

lr_prediction = lr_model.transform(test)

# Calling the evaluator
res = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='label')

# Evaluating the AUC on Logistic Regression Model
ROC_AUC_LR = res.evaluate(lr_prediction)

print("AUC on Logistic Regression Model : ", ROC_AUC_LR)

gbt_prediction = gbt_model.transform(test)

# Evaluating the AUC on Logistic Regression Model
ROC_AUC_GBT = res.evaluate(gbt_prediction)

print("AUC on Logistic Regression Model : ", ROC_AUC_GBT)


AUC on Logistic Regression Model :  0.7781033904535466
AUC on Logistic Regression Model :  0.7391428977571507
