In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("FinalProject").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()

adult_df = spark.read.csv("adult/adult.data", inferSchema = True)
adult_df2 = spark.read.csv("adult/adult.test", inferSchema = True)


adult_df2.show()

24/12/07 00:39:07 WARN Utils: Your hostname, Zains-Mac.local resolves to a loopback address: 127.0.0.1, but we couldn't find any external IP address!
24/12/07 00:39:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/07 00:39:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+---+-----------------+--------+-------------+----+-------------------+------------------+--------------+-------------------+-------+------+----+----+--------------+-------+
|_c0|              _c1|     _c2|          _c3| _c4|                _c5|               _c6|           _c7|                _c8|    _c9|  _c10|_c11|_c12|          _c13|   _c14|
+---+-----------------+--------+-------------+----+-------------------+------------------+--------------+-------------------+-------+------+----+----+--------------+-------+
| 25|          Private|226802.0|         11th| 7.0|      Never-married| Machine-op-inspct|     Own-child|              Black|   Male|   0.0| 0.0|40.0| United-States| <=50K.|
| 38|          Private| 89814.0|      HS-grad| 9.0| Married-civ-spouse|   Farming-fishing|       Husband|              White|   Male|   0.0| 0.0|50.0| United-States| <=50K.|
| 28|        Local-gov|336951.0|   Assoc-acdm|12.0| Married-civ-spouse|   Protective-serv|       Husband|              White|   Ma

In [2]:
# column headers
columns = [
    "age", "workclass", "fnlwgt", "education", "education-num",
    "marital-status", "occupation", "relationship", "race", "sex",
    "capital-gain", "capital-loss", "hours-per-week", "native-country",
    "income"
]

# Adding column headers
adult_df = adult_df.toDF(*columns)
adult_df2 = adult_df2.toDF(*columns)

# Combining old data to create a unified set
combined_df = adult_df.union(adult_df2)

adult_df.show()

+---+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|  fnlwgt|    education|education-num|      marital-status|        occupation|  relationship|               race|    sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516.0|    Bachelors|         13.0|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|      2174.0|         0.0|          40.0| United-States| <=50K|
| 50| Self-emp-not-inc| 83311.0|    Bachelors|         13.0|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|         0.0|         0.0|   

In [3]:
count = combined_df.count()
print(f"Total number of rows: {count}")

Total number of rows: 48842


In [4]:
from pyspark.sql.functions import col, regexp_replace

# Dropping rows with missing values
df_cleaned = combined_df.dropna()

# Show the distinct values in label income column to clean data
distinct_values = df_cleaned.select("income").distinct()
print("Income column before cleaning:")
distinct_values.show(truncate=False)

# Remove trailing spaces or periods from the 'income' column
df_cleaned = df_cleaned.withColumn("income", regexp_replace(col("income"), "\\.", ""))

# Show the distinct values after cleaning
distinct_values = df_cleaned.select("income").distinct()
print("Income column after cleaning:")
distinct_values.show(truncate=False)

Income column before cleaning:
+-------+
|income |
+-------+
| >50K  |
| <=50K |
| >50K. |
| <=50K.|
+-------+

Income column after cleaning:
+------+
|income|
+------+
| >50K |
| <=50K|
+------+



In [5]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Distilling the features and labels
feature_columns = [
    "age", "workclass", "fnlwgt", "education", "education-num", 
    "marital-status", "occupation", "relationship", "race", "sex", 
    "capital-gain", "capital-loss", "hours-per-week", "native-country"
]
label_column = "income"

# One-hot encoding categorical features (first indexing then encoding)
categorical_columns = [
    "workclass", "education", "marital-status", "occupation", 
    "relationship", "race", "sex", "native-country"
]
numerical_columns = ["age", "fnlwgt", "education-num", "capital-gain", "capital-loss", "hours-per-week"]

indexer_income = StringIndexer(inputCol=label_column, outputCol="label")
# StringIndexer and OneHotEncoder stages for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]
encoders = [OneHotEncoder(inputCols=[col + "_index"], outputCols=[col + "_onehot"]) for col in categorical_columns]

# all features combined into a single vector column
assembler = VectorAssembler(
    inputCols=[col + "_onehot" for col in categorical_columns] + numerical_columns,
    outputCol="features"
)

# pipeline with stages indexer, encoder, assembler
pipeline = Pipeline(stages=[indexer_income] + indexers + encoders + [assembler])

# Fit and transform
model = pipeline.fit(df_cleaned)
processed_data = model.transform(df_cleaned)

# Selecting features and labels
final_data = processed_data.select("features", "label")

final_data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(100,[4,10,24,32,...|  0.0|
|(100,[1,10,23,31,...|  0.0|
|(100,[0,8,25,38,4...|  0.0|
|(100,[0,13,23,38,...|  0.0|
|(100,[0,10,23,29,...|  0.0|
+--------------------+-----+
only showing top 5 rows



24/12/07 00:39:13 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [6]:
# Splitting the dataset 80-20 into training and testing sets
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=100)

# Training and testing dataset count
print("Number of training samples:", train_data.count())
print("Number of testing samples:", test_data.count())

                                                                                

Number of training samples: 39119
Number of testing samples: 9723


In [7]:
from pyspark.ml.classification import LogisticRegression as MLlibLR, GBTClassifier as MLlibGBT

# Logistic Regression Model
lr = MLlibLR(featuresCol='features', labelCol='label')
print("Training logistic regression model...")
lr_model = lr.fit(train_data)
print("Model trained successfully!")

# Gradient-Boosted Tree Model (GBT)
gbt = MLlibGBT(featuresCol='features', labelCol='label')
print("Training gradient-boosted tree model...")
gbt_model = gbt.fit(train_data)
print("Model trained successfully!")

Training logistic regression model...


24/12/07 00:39:15 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/07 00:39:15 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


Model trained successfully!
Training gradient-boosted tree model...
Model trained successfully!


In [8]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Using BinaryClassificationEvaluator to evaluate models
evaluator = BinaryClassificationEvaluator(labelCol='label', metricName="areaUnderROC")

# Logistic Regression Grid Search and grid parameter
lr_param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.maxIter, [10, 50, 100]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Cross-Validator for Logistic Regression
lr_cross_validator = CrossValidator(
    estimator=lr,
    estimatorParamMaps=lr_param_grid,
    evaluator=evaluator,
    numFolds=5  # 5-fold cross-validation
)

# Gradient-Boosted Tree Grid Search and grid parameter
gbt_param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10]) \
    .addGrid(gbt.maxIter, [10, 20]) \
    .addGrid(gbt.stepSize, [0.1, 0.2]) \
    .build()

# Cross-Validator for GBT
gbt_cross_validator = CrossValidator(
    estimator=gbt,
    estimatorParamMaps=gbt_param_grid,
    evaluator=evaluator,
    numFolds=5,  # 5-fold cross-validation
    parallelism=4
)

# Logistic Regression Model with Cross-Validation
print("Training logistic regression model with cross-validation...")
lr_cv_model = lr_cross_validator.fit(train_data)
print("Logistic Regression model trained successfully!")

# best logistic regression model
best_lr_model = lr_cv_model.bestModel
print(f"Best Logistic Regression Model: {best_lr_model}")

# Gradient-Boosted Tree Model with Cross-Validation
print("Training gradient-boosted tree model with cross-validation...")
gbt_cv_model = gbt_cross_validator.fit(train_data)
print("Gradient-Boosted Tree model trained successfully!")

# best GBT model
best_gbt_model = gbt_cv_model.bestModel
print(f"Best Gradient-Boosted Tree Model: {best_gbt_model}")

Training logistic regression model with cross-validation...


24/12/07 00:39:45 WARN BlockManager: Asked to remove block broadcast_3268, which does not exist


Logistic Regression model trained successfully!
Best Logistic Regression Model: LogisticRegressionModel: uid=LogisticRegression_db6a78cdf70e, numClasses=2, numFeatures=100
Training gradient-boosted tree model with cross-validation...


24/12/07 00:40:31 WARN BlockManager: Block rdd_11939_1 already exists on this machine; not re-adding it
24/12/07 00:40:41 WARN DAGScheduler: Broadcasting large task binary with size 1007.7 KiB
24/12/07 00:40:41 WARN DAGScheduler: Broadcasting large task binary with size 1030.6 KiB
24/12/07 00:40:41 WARN DAGScheduler: Broadcasting large task binary with size 1029.6 KiB
24/12/07 00:40:41 WARN DAGScheduler: Broadcasting large task binary with size 1030.1 KiB
24/12/07 00:40:41 WARN DAGScheduler: Broadcasting large task binary with size 1030.7 KiB
24/12/07 00:40:41 WARN DAGScheduler: Broadcasting large task binary with size 1031.8 KiB
24/12/07 00:40:41 WARN DAGScheduler: Broadcasting large task binary with size 1033.9 KiB
24/12/07 00:40:41 WARN DAGScheduler: Broadcasting large task binary with size 1037.4 KiB
24/12/07 00:40:41 WARN DAGScheduler: Broadcasting large task binary with size 1042.8 KiB
24/12/07 00:40:41 WARN DAGScheduler: Broadcasting large task binary with size 1052.8 KiB
24/12/

Gradient-Boosted Tree model trained successfully!
Best Gradient-Boosted Tree Model: GBTClassificationModel: uid = GBTClassifier_0999a31debb4, numTrees=20, numClasses=2, numFeatures=100


In [9]:
# Evaluating the best models on test data
# Logistic Regression
lr_predictions = best_lr_model.transform(test_data)
lr_auc = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression AUC on test data: {lr_auc}")

# Gradient-Boosted Tree
gbt_predictions = best_gbt_model.transform(test_data)
gbt_auc = evaluator.evaluate(gbt_predictions)
print(f"Gradient-Boosted Tree AUC on test data: {gbt_auc}")

Logistic Regression AUC on test data: 0.9043556206948926
Gradient-Boosted Tree AUC on test data: 0.915854418387934
